1111 NodeMigratedNotification ,
1212 NodeMigratingNotification ,
1313 NodeMovingNotification ,
14+ OSSNodeMigratedNotification ,
15+ OSSNodeMigratingNotification ,
1416)
1517
1618if sys .version_info .major >= 3 and sys .version_info .minor >= 11 :
@@ -178,16 +180,39 @@ async def read_response(
178180class MaintenanceNotificationsParser :
179181 """Protocol defining maintenance push notification parsing functionality"""
180182
183+ @staticmethod
184+ def parse_oss_maintenance_start_msg (response ):
185+ # Expected message format is:
186+ # SMIGRATING <seq_number> <slot, range1-range2,...>
187+ id = response [1 ]
188+ slots = response [2 ]
189+ return OSSNodeMigratingNotification (id , slots )
190+
191+ @staticmethod
192+ def parse_oss_maintenance_completed_msg (response ):
193+ # Expected message format is:
194+ # SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
195+ id = response [1 ]
196+ node_address = response [2 ]
197+ slots = response [3 ]
198+ return OSSNodeMigratedNotification (id , node_address , slots )
199+
181200 @staticmethod
182201 def parse_maintenance_start_msg (response , notification_type ):
183202 # Expected message format is: <notification_type> <seq_number> <time>
203+ # Examples:
204+ # MIGRATING 1 10
205+ # FAILING_OVER 2 20
184206 id = response [1 ]
185207 ttl = response [2 ]
186208 return notification_type (id , ttl )
187209
188210 @staticmethod
189211 def parse_maintenance_completed_msg (response , notification_type ):
190212 # Expected message format is: <notification_type> <seq_number>
213+ # Examples:
214+ # MIGRATED 1
215+ # FAILED_OVER 2
191216 id = response [1 ]
192217 return notification_type (id )
193218
@@ -214,12 +239,15 @@ def parse_moving_msg(response):
214239_MIGRATED_MESSAGE = "MIGRATED"
215240_FAILING_OVER_MESSAGE = "FAILING_OVER"
216241_FAILED_OVER_MESSAGE = "FAILED_OVER"
242+ _SMIGRATING_MESSAGE = "SMIGRATING"
243+ _SMIGRATED_MESSAGE = "SMIGRATED"
217244
218245_MAINTENANCE_MESSAGES = (
219246 _MIGRATING_MESSAGE ,
220247 _MIGRATED_MESSAGE ,
221248 _FAILING_OVER_MESSAGE ,
222249 _FAILED_OVER_MESSAGE ,
250+ _SMIGRATING_MESSAGE ,
223251)
224252
225253MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING : dict [
@@ -245,6 +273,14 @@ def parse_moving_msg(response):
245273 NodeMovingNotification ,
246274 MaintenanceNotificationsParser .parse_moving_msg ,
247275 ),
276+ _SMIGRATING_MESSAGE : (
277+ OSSNodeMigratingNotification ,
278+ MaintenanceNotificationsParser .parse_oss_maintenance_start_msg ,
279+ ),
280+ _SMIGRATED_MESSAGE : (
281+ OSSNodeMigratedNotification ,
282+ MaintenanceNotificationsParser .parse_oss_maintenance_completed_msg ,
283+ ),
248284}
249285
250286
@@ -255,6 +291,7 @@ class PushNotificationsParser(Protocol):
255291 invalidation_push_handler_func : Optional [Callable ] = None
256292 node_moving_push_handler_func : Optional [Callable ] = None
257293 maintenance_push_handler_func : Optional [Callable ] = None
294+ oss_cluster_maint_push_handler_func : Optional [Callable ] = None
258295
259296 def handle_pubsub_push_response (self , response ):
260297 """Handle pubsub push responses"""
@@ -269,6 +306,7 @@ def handle_push_response(self, response, **kwargs):
269306 _INVALIDATION_MESSAGE ,
270307 * _MAINTENANCE_MESSAGES ,
271308 _MOVING_MESSAGE ,
309+ _SMIGRATED_MESSAGE ,
272310 ):
273311 return self .pubsub_push_handler_func (response )
274312
@@ -291,13 +329,27 @@ def handle_push_response(self, response, **kwargs):
291329 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
292330 msg_type
293331 ][1 ]
294- notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
295- msg_type
296- ][0 ]
297- notification = parser_function (response , notification_type )
332+ if msg_type == _SMIGRATING_MESSAGE :
333+ notification = parser_function (response )
334+ else :
335+ notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
336+ msg_type
337+ ][0 ]
338+ notification = parser_function (response , notification_type )
298339
299340 if notification is not None :
300341 return self .maintenance_push_handler_func (notification )
342+ if (
343+ msg_type == _SMIGRATED_MESSAGE
344+ and self .oss_cluster_maint_push_handler_func
345+ ):
346+ parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
347+ msg_type
348+ ][1 ]
349+ notification = parser_function (response )
350+
351+ if notification is not None :
352+ return self .oss_cluster_maint_push_handler_func (notification )
301353 except Exception as e :
302354 logger .error (
303355 "Error handling {} message ({}): {}" .format (msg_type , response , e )
@@ -317,6 +369,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
317369 def set_maintenance_push_handler (self , maintenance_push_handler_func ):
318370 self .maintenance_push_handler_func = maintenance_push_handler_func
319371
372+ def set_oss_cluster_maint_push_handler (self , oss_cluster_maint_push_handler_func ):
373+ self .oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
374+
320375
321376class AsyncPushNotificationsParser (Protocol ):
322377 """Protocol defining async RESP3-specific parsing functionality"""
@@ -325,6 +380,7 @@ class AsyncPushNotificationsParser(Protocol):
325380 invalidation_push_handler_func : Optional [Callable ] = None
326381 node_moving_push_handler_func : Optional [Callable [..., Awaitable [None ]]] = None
327382 maintenance_push_handler_func : Optional [Callable [..., Awaitable [None ]]] = None
383+ oss_cluster_maint_push_handler_func : Optional [Callable [..., Awaitable [None ]]] = None
328384
329385 async def handle_pubsub_push_response (self , response ):
330386 """Handle pubsub push responses asynchronously"""
@@ -341,6 +397,7 @@ async def handle_push_response(self, response, **kwargs):
341397 _INVALIDATION_MESSAGE ,
342398 * _MAINTENANCE_MESSAGES ,
343399 _MOVING_MESSAGE ,
400+ _SMIGRATED_MESSAGE ,
344401 ):
345402 return await self .pubsub_push_handler_func (response )
346403
@@ -365,13 +422,26 @@ async def handle_push_response(self, response, **kwargs):
365422 parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
366423 msg_type
367424 ][1 ]
368- notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
369- msg_type
370- ][0 ]
371- notification = parser_function (response , notification_type )
425+ if msg_type == _SMIGRATING_MESSAGE :
426+ notification = parser_function (response )
427+ else :
428+ notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
429+ msg_type
430+ ][0 ]
431+ notification = parser_function (response , notification_type )
372432
373433 if notification is not None :
374434 return await self .maintenance_push_handler_func (notification )
435+ if (
436+ msg_type == _SMIGRATED_MESSAGE
437+ and self .oss_cluster_maint_push_handler_func
438+ ):
439+ parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING [
440+ msg_type
441+ ][1 ]
442+ notification = parser_function (response )
443+ if notification is not None :
444+ return await self .oss_cluster_maint_push_handler_func (notification )
375445 except Exception as e :
376446 logger .error (
377447 "Error handling {} message ({}): {}" .format (msg_type , response , e )
@@ -393,6 +463,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
393463 def set_maintenance_push_handler (self , maintenance_push_handler_func ):
394464 self .maintenance_push_handler_func = maintenance_push_handler_func
395465
466+ def set_oss_cluster_maint_push_handler (self , oss_cluster_maint_push_handler_func ):
467+ self .oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func
468+
396469
397470class _AsyncRESPBase (AsyncBaseParser ):
398471 """Base class for async resp parsing"""
0 commit comments