Skip to content

Commit 5258e0c

Browse files
committed
Adding handling of SMIGRATED push notifications (#3857)
* Adding SMIGRATED handling * Applying Copilot's comments * Applying review comments
1 parent f8b0a1a commit 5258e0c

File tree

10 files changed

+1077
-150
lines changed

10 files changed

+1077
-150
lines changed

redis/_parsers/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
OSSNodeMigratedNotification,
1515
OSSNodeMigratingNotification,
1616
)
17+
from redis.utils import safe_str
1718

1819
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
1920
from asyncio import timeout as async_timeout
@@ -193,8 +194,9 @@ def parse_oss_maintenance_completed_msg(response):
193194
# Expected message format is:
194195
# SMIGRATED <seq_number> <host:port> <slot, range1-range2,...>
195196
id = response[1]
196-
node_address = response[2]
197+
node_address = safe_str(response[2])
197198
slots = response[3]
199+
198200
return OSSNodeMigratedNotification(id, node_address, slots)
199201

200202
@staticmethod
@@ -224,9 +226,7 @@ def parse_moving_msg(response):
224226
if response[3] is None:
225227
host, port = None, None
226228
else:
227-
value = response[3]
228-
if isinstance(value, bytes):
229-
value = value.decode()
229+
value = safe_str(response[3])
230230
host, port = value.split(":")
231231
port = int(port) if port is not None else None
232232

redis/client.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@
5959
from redis.lock import Lock
6060
from redis.maint_notifications import (
6161
MaintNotificationsConfig,
62+
OSSMaintNotificationsHandler,
6263
)
6364
from redis.retry import Retry
6465
from redis.utils import (
6566
_set_info_logger,
67+
check_protocol_version,
6668
deprecated_args,
6769
safe_str,
6870
str_if_bytes,
@@ -256,6 +258,9 @@ def __init__(
256258
cache_config: Optional[CacheConfig] = None,
257259
event_dispatcher: Optional[EventDispatcher] = None,
258260
maint_notifications_config: Optional[MaintNotificationsConfig] = None,
261+
oss_cluster_maint_notifications_handler: Optional[
262+
OSSMaintNotificationsHandler
263+
] = None,
259264
) -> None:
260265
"""
261266
Initialize a new Redis client.
@@ -303,6 +308,11 @@ def __init__(
303308
will be enabled by default (logic is included in the connection pool
304309
initialization).
305310
Argument is ignored when connection_pool is provided.
311+
oss_cluster_maint_notifications_handler:
312+
handler for OSS cluster notifications - see
313+
`redis.maint_notifications.OSSMaintNotificationsHandler` for details.
314+
Only supported with RESP3
315+
Argument is ignored when connection_pool is provided.
306316
"""
307317
if event_dispatcher is None:
308318
self._event_dispatcher = EventDispatcher()
@@ -377,7 +387,7 @@ def __init__(
377387
"ssl_ciphers": ssl_ciphers,
378388
}
379389
)
380-
if (cache_config or cache) and protocol in [3, "3"]:
390+
if (cache_config or cache) and check_protocol_version(protocol, 3):
381391
kwargs.update(
382392
{
383393
"cache": cache,
@@ -400,6 +410,12 @@ def __init__(
400410
"maint_notifications_config": maint_notifications_config,
401411
}
402412
)
413+
if oss_cluster_maint_notifications_handler:
414+
kwargs.update(
415+
{
416+
"oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
417+
}
418+
)
403419
connection_pool = ConnectionPool(**kwargs)
404420
self._event_dispatcher.dispatch(
405421
AfterPooledConnectionsInstantiationEvent(

redis/cluster.py

Lines changed: 114 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,13 @@
6262
WatchError,
6363
)
6464
from redis.lock import Lock
65-
from redis.maint_notifications import MaintNotificationsConfig
65+
from redis.maint_notifications import (
66+
MaintNotificationsConfig,
67+
OSSMaintNotificationsHandler,
68+
)
6669
from redis.retry import Retry
6770
from redis.utils import (
71+
check_protocol_version,
6872
deprecated_args,
6973
deprecated_function,
7074
dict_merge,
@@ -226,6 +230,67 @@ def cleanup_kwargs(**kwargs):
226230
return connection_kwargs
227231

228232

233+
class MaintNotificationsAbstractRedisCluster:
234+
"""
235+
Abstract class for handling maintenance notifications logic.
236+
This class is expected to be used as base class together with RedisCluster.
237+
238+
This class is intended to be used with multiple inheritance!
239+
240+
All logic related to maintenance notifications is encapsulated in this class.
241+
"""
242+
243+
def __init__(
244+
self,
245+
maint_notifications_config: Optional[MaintNotificationsConfig],
246+
**kwargs,
247+
):
248+
# Initialize maintenance notifications
249+
is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3)
250+
251+
if (
252+
maint_notifications_config
253+
and maint_notifications_config.enabled
254+
and not is_protocol_supported
255+
):
256+
raise RedisError(
257+
"Maintenance notifications handlers on connection are only supported with RESP version 3"
258+
)
259+
if maint_notifications_config is None and is_protocol_supported:
260+
maint_notifications_config = MaintNotificationsConfig()
261+
262+
self.maint_notifications_config = maint_notifications_config
263+
264+
if self.maint_notifications_config and self.maint_notifications_config.enabled:
265+
self._oss_cluster_maint_notifications_handler = (
266+
OSSMaintNotificationsHandler(self, self.maint_notifications_config)
267+
)
268+
# Update connection kwargs for all future nodes connections
269+
self._update_connection_kwargs_for_maint_notifications(
270+
self._oss_cluster_maint_notifications_handler
271+
)
272+
# Update existing nodes connections - they are created as part of the RedisCluster constructor
273+
for node in self.get_nodes():
274+
node.redis_connection.connection_pool.update_maint_notifications_config(
275+
self.maint_notifications_config,
276+
oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
277+
)
278+
else:
279+
self._oss_cluster_maint_notifications_handler = None
280+
281+
def _update_connection_kwargs_for_maint_notifications(
282+
self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
283+
):
284+
"""
285+
Update the connection kwargs for all future connections.
286+
"""
287+
self.nodes_manager.connection_kwargs.update(
288+
{
289+
"oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
290+
}
291+
)
292+
293+
229294
class AbstractRedisCluster:
230295
RedisClusterRequestTTL = 16
231296

@@ -473,7 +538,9 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
473538
self.nodes_manager.default_node = random.choice(replicas)
474539

475540

476-
class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
541+
class RedisCluster(
542+
AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
543+
):
477544
@classmethod
478545
def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
479546
"""
@@ -624,8 +691,7 @@ def __init__(
624691
`redis.maint_notifications.MaintNotificationsConfig` for details.
625692
Only supported with RESP3.
626693
If not provided and protocol is RESP3, the maintenance notifications
627-
will be enabled by default (logic is included in the NodesManager
628-
initialization).
694+
will be enabled by default.
629695
:**kwargs:
630696
Extra arguments that will be sent into Redis instance when created
631697
(See Official redis-py doc for supported kwargs - the only limitation
@@ -707,9 +773,16 @@ def __init__(
707773
kwargs.get("decode_responses", False),
708774
)
709775
protocol = kwargs.get("protocol", None)
710-
if (cache_config or cache) and protocol not in [3, "3"]:
776+
if (cache_config or cache) and not check_protocol_version(protocol, 3):
711777
raise RedisError("Client caching is only supported with RESP version 3")
712778

779+
if maint_notifications_config and not check_protocol_version(protocol, 3):
780+
raise RedisError(
781+
"Maintenance notifications are only supported with RESP version 3"
782+
)
783+
if check_protocol_version(protocol, 3) and maint_notifications_config is None:
784+
maint_notifications_config = MaintNotificationsConfig()
785+
713786
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
714787
self.node_flags = self.__class__.NODE_FLAGS.copy()
715788
self.read_from_replicas = read_from_replicas
@@ -721,6 +794,7 @@ def __init__(
721794
else:
722795
self._event_dispatcher = event_dispatcher
723796
self.startup_nodes = startup_nodes
797+
724798
self.nodes_manager = NodesManager(
725799
startup_nodes=startup_nodes,
726800
from_url=from_url,
@@ -775,6 +849,10 @@ def __init__(
775849
self._aggregate_nodes = None
776850
self._lock = threading.RLock()
777851

852+
MaintNotificationsAbstractRedisCluster.__init__(
853+
self, maint_notifications_config, **kwargs
854+
)
855+
778856
def __enter__(self):
779857
return self
780858

@@ -1645,9 +1723,7 @@ def __init__(
16451723
cache_config: Optional[CacheConfig] = None,
16461724
cache_factory: Optional[CacheFactoryInterface] = None,
16471725
event_dispatcher: Optional[EventDispatcher] = None,
1648-
maint_notifications_config: Optional[
1649-
MaintNotificationsConfig
1650-
] = MaintNotificationsConfig(),
1726+
maint_notifications_config: Optional[MaintNotificationsConfig] = None,
16511727
**kwargs,
16521728
):
16531729
self.nodes_cache: dict[str, ClusterNode] = {}
@@ -1922,11 +1998,29 @@ def _get_epoch(self) -> int:
19221998
with self._lock:
19231999
return self._epoch
19242000

1925-
def initialize(self):
2001+
def initialize(
2002+
self,
2003+
additional_startup_nodes_info: List[Tuple[str, int]] = [],
2004+
disconnect_startup_nodes_pools: bool = True,
2005+
):
19262006
"""
19272007
Initializes the nodes cache, slots cache and redis connections.
19282008
:startup_nodes:
19292009
Responsible for discovering other nodes in the cluster
2010+
:disconnect_startup_nodes_pools:
2011+
Whether to disconnect the connection pool of the startup nodes
2012+
after the initialization is complete. This is useful when the
2013+
startup nodes are not part of the cluster and we want to avoid
2014+
keeping the connection open.
2015+
:additional_startup_nodes_info:
2016+
Additional nodes to add temporarily to the startup nodes.
2017+
The additional nodes will be used just in the process of extraction of the slots
2018+
and nodes information from the cluster.
2019+
This is useful when we want to add new nodes to the cluster
2020+
and initialize the client
2021+
with them.
2022+
The format of the list is a list of tuples, where each tuple contains
2023+
the host and port of the node.
19302024
"""
19312025
self.reset()
19322026
tmp_nodes_cache = {}
@@ -1945,13 +2039,18 @@ def initialize(self):
19452039
# bother running again
19462040
return
19472041

2042+
additional_startup_nodes = [
2043+
ClusterNode(host, port) for host, port in additional_startup_nodes_info
2044+
]
2045+
19482046
with self._lock:
19492047
startup_nodes = tuple(self.startup_nodes.values())
19502048

1951-
for startup_node in startup_nodes:
2049+
for startup_node in (*startup_nodes, *additional_startup_nodes):
19522050
try:
19532051
if startup_node.redis_connection:
19542052
r = startup_node.redis_connection
2053+
19552054
else:
19562055
# Create a new Redis connection
19572056
r = self.create_redis_node(
@@ -1961,7 +2060,11 @@ def initialize(self):
19612060
# Make sure cluster mode is enabled on this node
19622061
try:
19632062
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
1964-
r.connection_pool.disconnect()
2063+
if disconnect_startup_nodes_pools:
2064+
# Disconnect the connection pool to avoid keeping the connection open
2065+
# For some cases we might not want to disconnect current pool and
2066+
# lose in flight commands responses
2067+
r.connection_pool.disconnect()
19652068
except ResponseError:
19662069
raise RedisClusterException(
19672070
"Cluster mode is not enabled on this node"

0 commit comments

Comments
 (0)