Skip to content

Commit f8b0a1a

Browse files
committed
Adding OSS Notification Classes for SMIGRATING and SMIGRATED. Handling of SMIGRATING is completed and covered with tests. (#3849)
* Adding maintenance notifications for OSS API enabled connections * Adding oss maint notifications handler configurations to parsers. Placeholder for smigrated handler in OSSMaintNotificationsHandler class * Applying review comments. Finilized the notifications format.
1 parent 7c9fcd8 commit f8b0a1a

File tree

7 files changed

+631
-65
lines changed

7 files changed

+631
-65
lines changed

redis/_parsers/base.py

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
NodeMigratedNotification,
1212
NodeMigratingNotification,
1313
NodeMovingNotification,
14+
OSSNodeMigratedNotification,
15+
OSSNodeMigratingNotification,
1416
)
1517

1618
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
@@ -178,16 +180,39 @@ async def read_response(
178180
class MaintenanceNotificationsParser:
179181
"""Protocol defining maintenance push notification parsing functionality"""
180182

183+
@staticmethod
184+
def parse_oss_maintenance_start_msg(response):
185+
# Expected message format is:
186+
# SMIGRATING <seq_number> <slot, range1-range2,...>
187+
id = response[1]
188+
slots = response[2]
189+
return OSSNodeMigratingNotification(id, slots)
190+
191+
@staticmethod
192+
def parse_oss_maintenance_completed_msg(response):
193+
# Expected message format is:
194+
# SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
195+
id = response[1]
196+
node_address = response[2]
197+
slots = response[3]
198+
return OSSNodeMigratedNotification(id, node_address, slots)
199+
181200
@staticmethod
182201
def parse_maintenance_start_msg(response, notification_type):
183202
# Expected message format is: <notification_type> <seq_number> <time>
203+
# Examples:
204+
# MIGRATING 1 10
205+
# FAILING_OVER 2 20
184206
id = response[1]
185207
ttl = response[2]
186208
return notification_type(id, ttl)
187209

188210
@staticmethod
189211
def parse_maintenance_completed_msg(response, notification_type):
190212
# Expected message format is: <notification_type> <seq_number>
213+
# Examples:
214+
# MIGRATED 1
215+
# FAILED_OVER 2
191216
id = response[1]
192217
return notification_type(id)
193218

@@ -214,12 +239,15 @@ def parse_moving_msg(response):
214239
_MIGRATED_MESSAGE = "MIGRATED"
215240
_FAILING_OVER_MESSAGE = "FAILING_OVER"
216241
_FAILED_OVER_MESSAGE = "FAILED_OVER"
242+
_SMIGRATING_MESSAGE = "SMIGRATING"
243+
_SMIGRATED_MESSAGE = "SMIGRATED"
217244

218245
_MAINTENANCE_MESSAGES = (
219246
_MIGRATING_MESSAGE,
220247
_MIGRATED_MESSAGE,
221248
_FAILING_OVER_MESSAGE,
222249
_FAILED_OVER_MESSAGE,
250+
_SMIGRATING_MESSAGE,
223251
)
224252

225253
MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
@@ -245,6 +273,14 @@ def parse_moving_msg(response):
245273
NodeMovingNotification,
246274
MaintenanceNotificationsParser.parse_moving_msg,
247275
),
276+
_SMIGRATING_MESSAGE: (
277+
OSSNodeMigratingNotification,
278+
MaintenanceNotificationsParser.parse_oss_maintenance_start_msg,
279+
),
280+
_SMIGRATED_MESSAGE: (
281+
OSSNodeMigratedNotification,
282+
MaintenanceNotificationsParser.parse_oss_maintenance_completed_msg,
283+
),
248284
}
249285

250286

@@ -255,6 +291,7 @@ class PushNotificationsParser(Protocol):
255291
invalidation_push_handler_func: Optional[Callable] = None
256292
node_moving_push_handler_func: Optional[Callable] = None
257293
maintenance_push_handler_func: Optional[Callable] = None
294+
oss_cluster_maint_push_handler_func: Optional[Callable] = None
258295

259296
def handle_pubsub_push_response(self, response):
260297
"""Handle pubsub push responses"""
@@ -269,6 +306,7 @@ def handle_push_response(self, response, **kwargs):
269306
_INVALIDATION_MESSAGE,
270307
*_MAINTENANCE_MESSAGES,
271308
_MOVING_MESSAGE,
309+
_SMIGRATED_MESSAGE,
272310
):
273311
return self.pubsub_push_handler_func(response)
274312

@@ -291,13 +329,27 @@ def handle_push_response(self, response, **kwargs):
291329
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
292330
msg_type
293331
][1]
294-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
295-
msg_type
296-
][0]
297-
notification = parser_function(response, notification_type)
332+
if msg_type == _SMIGRATING_MESSAGE:
333+
notification = parser_function(response)
334+
else:
335+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
336+
msg_type
337+
][0]
338+
notification = parser_function(response, notification_type)
298339

299340
if notification is not None:
300341
return self.maintenance_push_handler_func(notification)
342+
if (
343+
msg_type == _SMIGRATED_MESSAGE
344+
and self.oss_cluster_maint_push_handler_func
345+
):
346+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
347+
msg_type
348+
][1]
349+
notification = parser_function(response)
350+
351+
if notification is not None:
352+
return self.oss_cluster_maint_push_handler_func(notification)
301353
except Exception as e:
302354
logger.error(
303355
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -317,6 +369,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
317369
def set_maintenance_push_handler(self, maintenance_push_handler_func):
318370
self.maintenance_push_handler_func = maintenance_push_handler_func
319371

372+
def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
373+
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
374+
320375

321376
class AsyncPushNotificationsParser(Protocol):
322377
"""Protocol defining async RESP3-specific parsing functionality"""
@@ -325,6 +380,7 @@ class AsyncPushNotificationsParser(Protocol):
325380
invalidation_push_handler_func: Optional[Callable] = None
326381
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
327382
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
383+
oss_cluster_maint_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
328384

329385
async def handle_pubsub_push_response(self, response):
330386
"""Handle pubsub push responses asynchronously"""
@@ -341,6 +397,7 @@ async def handle_push_response(self, response, **kwargs):
341397
_INVALIDATION_MESSAGE,
342398
*_MAINTENANCE_MESSAGES,
343399
_MOVING_MESSAGE,
400+
_SMIGRATED_MESSAGE,
344401
):
345402
return await self.pubsub_push_handler_func(response)
346403

@@ -365,13 +422,26 @@ async def handle_push_response(self, response, **kwargs):
365422
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
366423
msg_type
367424
][1]
368-
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
369-
msg_type
370-
][0]
371-
notification = parser_function(response, notification_type)
425+
if msg_type == _SMIGRATING_MESSAGE:
426+
notification = parser_function(response)
427+
else:
428+
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
429+
msg_type
430+
][0]
431+
notification = parser_function(response, notification_type)
372432

373433
if notification is not None:
374434
return await self.maintenance_push_handler_func(notification)
435+
if (
436+
msg_type == _SMIGRATED_MESSAGE
437+
and self.oss_cluster_maint_push_handler_func
438+
):
439+
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
440+
msg_type
441+
][1]
442+
notification = parser_function(response)
443+
if notification is not None:
444+
return await self.oss_cluster_maint_push_handler_func(notification)
375445
except Exception as e:
376446
logger.error(
377447
"Error handling {} message ({}): {}".format(msg_type, response, e)
@@ -393,6 +463,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
393463
def set_maintenance_push_handler(self, maintenance_push_handler_func):
394464
self.maintenance_push_handler_func = maintenance_push_handler_func
395465

466+
def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
467+
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
468+
396469

397470
class _AsyncRESPBase(AsyncBaseParser):
398471
"""Base class for async resp parsing"""

redis/_parsers/hiredis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, socket_read_size):
4949
self.pubsub_push_handler_func = self.handle_pubsub_push_response
5050
self.node_moving_push_handler_func = None
5151
self.maintenance_push_handler_func = None
52+
self.oss_cluster_maint_push_handler_func = None
5253
self.invalidation_push_handler_func = None
5354
self._hiredis_PushNotificationType = None
5455

redis/_parsers/resp3.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(self, socket_read_size):
2020
self.pubsub_push_handler_func = self.handle_pubsub_push_response
2121
self.node_moving_push_handler_func = None
2222
self.maintenance_push_handler_func = None
23+
self.oss_cluster_maint_push_handler_func = None
2324
self.invalidation_push_handler_func = None
2425

2526
def handle_pubsub_push_response(self, response):

redis/maint_notifications.py

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
import threading
66
import time
77
from abc import ABC, abstractmethod
8-
from typing import TYPE_CHECKING, Literal, Optional, Union
8+
from typing import TYPE_CHECKING, List, Literal, Optional, Union
99

1010
from redis.typing import Number
1111

12+
if TYPE_CHECKING:
13+
from redis.cluster import NodesManager
14+
1215

1316
class MaintenanceState(enum.Enum):
1417
NONE = "none"
@@ -394,6 +397,125 @@ def __hash__(self) -> int:
394397
return hash((self.__class__.__name__, int(self.id)))
395398

396399

400+
class OSSNodeMigratingNotification(MaintenanceNotification):
401+
"""
402+
Notification for when a Redis OSS API client is used and a node is in the process of migrating slots.
403+
404+
This notification is received when a node starts migrating its slots to another node
405+
during cluster rebalancing or maintenance operations.
406+
407+
Args:
408+
id (int): Unique identifier for this notification
409+
slots (Optional[List[int]]): List of slots being migrated
410+
"""
411+
412+
DEFAULT_TTL = 30
413+
414+
def __init__(
415+
self,
416+
id: int,
417+
slots: Optional[List[int]] = None,
418+
):
419+
super().__init__(id, OSSNodeMigratingNotification.DEFAULT_TTL)
420+
self.slots = slots
421+
422+
def __repr__(self) -> str:
423+
expiry_time = self.creation_time + self.ttl
424+
remaining = max(0, expiry_time - time.monotonic())
425+
return (
426+
f"{self.__class__.__name__}("
427+
f"id={self.id}, "
428+
f"slots={self.slots}, "
429+
f"ttl={self.ttl}, "
430+
f"creation_time={self.creation_time}, "
431+
f"expires_at={expiry_time}, "
432+
f"remaining={remaining:.1f}s, "
433+
f"expired={self.is_expired()}"
434+
f")"
435+
)
436+
437+
def __eq__(self, other) -> bool:
438+
"""
439+
Two OSSNodeMigratingNotification notifications are considered equal if they have the same
440+
id and are of the same type.
441+
"""
442+
if not isinstance(other, OSSNodeMigratingNotification):
443+
return False
444+
return self.id == other.id and type(self) is type(other)
445+
446+
def __hash__(self) -> int:
447+
"""
448+
Return a hash value for the notification to allow
449+
instances to be used in sets and as dictionary keys.
450+
451+
Returns:
452+
int: Hash value based on notification type and id
453+
"""
454+
return hash((self.__class__.__name__, int(self.id)))
455+
456+
457+
class OSSNodeMigratedNotification(MaintenanceNotification):
458+
"""
459+
Notification for when a Redis OSS API client is used and a node has completed migrating slots.
460+
461+
This notification is received when a node has finished migrating all its slots
462+
to other nodes during cluster rebalancing or maintenance operations.
463+
464+
Args:
465+
id (int): Unique identifier for this notification
466+
node_address (Optional[str]): Address of the node that has
467+
completed migration - this is the destination node.
468+
slots (Optional[List[int]]): List of slots that have been migrated
469+
"""
470+
471+
DEFAULT_TTL = 30
472+
473+
def __init__(
474+
self,
475+
id: int,
476+
node_address: Optional[str] = None,
477+
slots: Optional[List[int]] = None,
478+
):
479+
super().__init__(id, OSSNodeMigratedNotification.DEFAULT_TTL)
480+
self.node_address = node_address
481+
self.slots = slots
482+
483+
def __repr__(self) -> str:
484+
expiry_time = self.creation_time + self.ttl
485+
remaining = max(0, expiry_time - time.monotonic())
486+
return (
487+
f"{self.__class__.__name__}("
488+
f"id={self.id}, "
489+
f"node_address={self.node_address}, "
490+
f"slots={self.slots}, "
491+
f"ttl={self.ttl}, "
492+
f"creation_time={self.creation_time}, "
493+
f"expires_at={expiry_time}, "
494+
f"remaining={remaining:.1f}s, "
495+
f"expired={self.is_expired()}"
496+
f")"
497+
)
498+
499+
def __eq__(self, other) -> bool:
500+
"""
501+
Two OSSNodeMigratedNotification notifications are considered equal if they have the same
502+
id and are of the same type.
503+
"""
504+
if not isinstance(other, OSSNodeMigratedNotification):
505+
return False
506+
return self.id == other.id and type(self) is type(other)
507+
508+
def __hash__(self) -> int:
509+
"""
510+
Return a hash value for the notification to allow
511+
instances to be used in sets and as dictionary keys.
512+
513+
Returns:
514+
int: Hash value based on notification type and id
515+
"""
516+
return hash((self.__class__.__name__, int(self.id)))
517+
518+
397519
def _is_private_fqdn(host: str) -> bool:
398520
"""
399521
Determine if an FQDN is likely to be internal/private.
@@ -755,6 +877,7 @@ class MaintNotificationsConnectionHandler:
755877
_NOTIFICATION_TYPES: dict[type["MaintenanceNotification"], int] = {
756878
NodeMigratingNotification: 1,
757879
NodeFailingOverNotification: 1,
880+
OSSNodeMigratingNotification: 1,
758881
NodeMigratedNotification: 0,
759882
NodeFailedOverNotification: 0,
760883
}
@@ -808,3 +931,31 @@ def handle_maintenance_completed_notification(self):
808931
# timeouts by providing -1 as the relaxed timeout
809932
self.connection.update_current_socket_timeout(-1)
810933
self.connection.maintenance_state = MaintenanceState.NONE
934+
935+
936+
class OSSMaintNotificationsHandler:
937+
def __init__(
938+
self, nodes_manager: "NodesManager", config: MaintNotificationsConfig
939+
) -> None:
940+
self.nodes_manager = nodes_manager
941+
self.config = config
942+
self._processed_notifications = set()
943+
self._lock = threading.RLock()
944+
945+
def remove_expired_notifications(self):
946+
with self._lock:
947+
for notification in tuple(self._processed_notifications):
948+
if notification.is_expired():
949+
self._processed_notifications.remove(notification)
950+
951+
def handle_notification(self, notification: MaintenanceNotification):
952+
if isinstance(notification, OSSNodeMigratedNotification):
953+
self.handle_oss_maintenance_completed_notification(notification)
954+
else:
955+
logging.error(f"Unhandled notification type: {notification}")
956+
957+
def handle_oss_maintenance_completed_notification(
958+
self, notification: OSSNodeMigratedNotification
959+
):
960+
self.remove_expired_notifications()
961+
logging.info(f"Received OSS maintenance completed notification: {notification}")

0 commit comments

Comments
 (0)