Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b67594a
Adding maintenance notification configuration to cluster client. Inte…
petyaslavova Nov 17, 2025
3b05201
Adding OSS Notification Classes for SMIGRATING and SMIGRATED. Handlin…
petyaslavova Nov 20, 2025
9c1dc88
Adding handling of SMIGRATED push notifications (#3857)
petyaslavova Dec 1, 2025
84d9c2d
Adding test coverage for cluster pubsub behavior with maihntenance pu…
petyaslavova Dec 3, 2025
925a7fc
Applying the changes in the SMIGRATED notification format (#3868)
petyaslavova Dec 8, 2025
cb79cd2
Wrapper for fault injector and mock proxy service + cluster maint not…
petyaslavova Dec 15, 2025
56321af
Adding e2e maint notifications tests covering node migration and rebi…
petyaslavova Dec 16, 2025
e07381c
Add e2e tests - validate new connections will receive maintenance not…
petyaslavova Jan 29, 2026
42f0411
Refactoring the SMIGRATED flow - the notification is changed to conta…
petyaslavova Feb 9, 2026
9847aab
Merge branch 'master' into feat/hitless_upgrade_sync_cluster_client
petyaslavova Feb 9, 2026
9327231
Applying augment review comments + fix for a flaky test in async toke…
petyaslavova Feb 9, 2026
7299f8d
Fixing some failing tests. Ignoring cache enabled test for maint noti…
petyaslavova Feb 9, 2026
3768c42
Updating lib version to match the last stable release
petyaslavova Feb 9, 2026
8db51bb
Fixing flaky test + exclude cache enabled test for servers <7.4
petyaslavova Feb 10, 2026
1940e7b
Changing the maint notifications logging strategy - to use named logg…
petyaslavova Feb 10, 2026
6a3f410
Fixing flaky test
petyaslavova Feb 10, 2026
dfe8c36
Merge branch 'master' into feat/hitless_upgrade_sync_cluster_client
petyaslavova Feb 10, 2026
37a80af
Merge branch 'master' into feat/hitless_upgrade_sync_cluster_client
petyaslavova Feb 11, 2026
dba0f04
Adding debug logging for some cluster operations
petyaslavova Feb 11, 2026
dea7483
Adding debug logging for some cluster operations
petyaslavova Feb 11, 2026
7139895
Updating cryptography dependency due to security issues.
petyaslavova Feb 11, 2026
72b985a
Updating cryptography dependency due to security issues.
petyaslavova Feb 11, 2026
c5824ff
reverting cryptography version changes - should stick to the versions…
petyaslavova Feb 11, 2026
052a7a3
Ignoring cryptography dependency vulnerability for now - it needs ent…
petyaslavova Feb 11, 2026
2117a31
Improving the cluster client debug logging
petyaslavova Feb 12, 2026
35c2036
Fixing debug enabled evaluation
petyaslavova Feb 12, 2026
ba05645
Decreasing the size of the logged debug info
petyaslavova Feb 12, 2026
c1c44a5
Potential fix for code scanning alert no. 20: Clear-text logging of s…
petyaslavova Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ docker/stunnel/keys
/dockers/replica/
/dockers/sentinel/
/dockers/redis-stack/
/experimenting/
44 changes: 44 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ x-client-libs-stack-image: &client-libs-stack-image
x-client-libs-image: &client-libs-image
image: "redislabs/client-libs-test:${CLIENT_LIBS_TEST_IMAGE_TAG:-8.4.0}"

networks:
redis-net:
driver: bridge
services:

redis:
Expand Down Expand Up @@ -108,3 +111,44 @@ services:
- standalone
- all-stack
- all

redis-proxied:
<<: *client-libs-image
container_name: redis-proxied
ports:
- "3000:3000"
networks:
- redis-net
healthcheck:
test: ["CMD", "redis-cli", "-p", "3000", "PING"]
interval: 10s
timeout: 3s
retries: 5

resp-proxy:
image: redislabs/client-resp-proxy
container_name: resp-proxy
environment:
LISTEN_HOST: "0.0.0.0"
LISTEN_PORT: "15379,15380,15381"
TARGET_HOST: "redis-proxied"
TARGET_PORT: "3000"
API_PORT: "4000"
ENABLE_LOGGING: true
SIMULATE_CLUSTER: true
DEFAULT_INTERCEPTORS: "cluster,hitless,logger"

ports:
- "15379:15379"
- "15380:15380"
- "15381:15381"
- "4000:4000"
depends_on:
- redis-proxied
networks:
- redis-net
healthcheck:
test: ["CMD", "sh", "-c", "wget -qO- http://localhost:4000/stats || exit 1"]
interval: 10s
timeout: 3s
retries: 5
114 changes: 103 additions & 11 deletions redis/_parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
NodeMigratedNotification,
NodeMigratingNotification,
NodeMovingNotification,
OSSNodeMigratedNotification,
OSSNodeMigratingNotification,
)
from redis.utils import safe_str

if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
from asyncio import timeout as async_timeout
Expand Down Expand Up @@ -178,16 +181,56 @@ async def read_response(
class MaintenanceNotificationsParser:
"""Protocol defining maintenance push notification parsing functionality"""

@staticmethod
def parse_oss_maintenance_start_msg(response):
# Expected message format is:
# SMIGRATING <seq_number> <slot, range1-range2,...>
id = response[1]
slots = response[2]
return OSSNodeMigratingNotification(id, slots)

@staticmethod
def parse_oss_maintenance_completed_msg(response):
# Expected message format is:
# SMIGRATED <seq_number> [[<src_host:port> <dest_host:port> <slot_range>], ...]
id = response[1]
nodes_to_slots_mapping_data = response[2]
# Build the nodes_to_slots_mapping dict structure:
# {
# "src_host:port": [
# {"dest_host:port": "slot_range"},
# ...
# ],
# ...
# }
nodes_to_slots_mapping = {}
for src_node, dest_node, slots in nodes_to_slots_mapping_data:
src_node_str = safe_str(src_node)
dest_node_str = safe_str(dest_node)
slots_str = safe_str(slots)

if src_node_str not in nodes_to_slots_mapping:
nodes_to_slots_mapping[src_node_str] = []
nodes_to_slots_mapping[src_node_str].append({dest_node_str: slots_str})

return OSSNodeMigratedNotification(id, nodes_to_slots_mapping)

@staticmethod
def parse_maintenance_start_msg(response, notification_type):
# Expected message format is: <notification_type> <seq_number> <time>
# Examples:
# MIGRATING 1 10
# FAILING_OVER 2 20
id = response[1]
ttl = response[2]
return notification_type(id, ttl)

@staticmethod
def parse_maintenance_completed_msg(response, notification_type):
# Expected message format is: <notification_type> <seq_number>
# Examples:
# MIGRATED 1
# FAILED_OVER 2
id = response[1]
return notification_type(id)

Expand All @@ -199,9 +242,7 @@ def parse_moving_msg(response):
if response[3] is None:
host, port = None, None
else:
value = response[3]
if isinstance(value, bytes):
value = value.decode()
value = safe_str(response[3])
host, port = value.split(":")
port = int(port) if port is not None else None

Expand All @@ -214,12 +255,15 @@ def parse_moving_msg(response):
_MIGRATED_MESSAGE = "MIGRATED"
_FAILING_OVER_MESSAGE = "FAILING_OVER"
_FAILED_OVER_MESSAGE = "FAILED_OVER"
_SMIGRATING_MESSAGE = "SMIGRATING"
_SMIGRATED_MESSAGE = "SMIGRATED"

_MAINTENANCE_MESSAGES = (
_MIGRATING_MESSAGE,
_MIGRATED_MESSAGE,
_FAILING_OVER_MESSAGE,
_FAILED_OVER_MESSAGE,
_SMIGRATING_MESSAGE,
)

MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
Expand All @@ -245,6 +289,14 @@ def parse_moving_msg(response):
NodeMovingNotification,
MaintenanceNotificationsParser.parse_moving_msg,
),
_SMIGRATING_MESSAGE: (
OSSNodeMigratingNotification,
MaintenanceNotificationsParser.parse_oss_maintenance_start_msg,
),
_SMIGRATED_MESSAGE: (
OSSNodeMigratedNotification,
MaintenanceNotificationsParser.parse_oss_maintenance_completed_msg,
),
}


Expand All @@ -255,6 +307,7 @@ class PushNotificationsParser(Protocol):
invalidation_push_handler_func: Optional[Callable] = None
node_moving_push_handler_func: Optional[Callable] = None
maintenance_push_handler_func: Optional[Callable] = None
oss_cluster_maint_push_handler_func: Optional[Callable] = None

def handle_pubsub_push_response(self, response):
"""Handle pubsub push responses"""
Expand All @@ -269,6 +322,7 @@ def handle_push_response(self, response, **kwargs):
_INVALIDATION_MESSAGE,
*_MAINTENANCE_MESSAGES,
_MOVING_MESSAGE,
_SMIGRATED_MESSAGE,
):
return self.pubsub_push_handler_func(response)

Expand All @@ -291,13 +345,30 @@ def handle_push_response(self, response, **kwargs):
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][0]
notification = parser_function(response, notification_type)
if msg_type == _SMIGRATING_MESSAGE:
notification = parser_function(response)
else:
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][0]
notification = parser_function(response, notification_type)

if notification is not None:
return self.maintenance_push_handler_func(notification)
if msg_type == _SMIGRATED_MESSAGE and (
self.oss_cluster_maint_push_handler_func
or self.maintenance_push_handler_func
):
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]
notification = parser_function(response)

if notification is not None:
if self.maintenance_push_handler_func:
self.maintenance_push_handler_func(notification)
if self.oss_cluster_maint_push_handler_func:
self.oss_cluster_maint_push_handler_func(notification)
except Exception as e:
logger.error(
"Error handling {} message ({}): {}".format(msg_type, response, e)
Expand All @@ -317,6 +388,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
def set_maintenance_push_handler(self, maintenance_push_handler_func):
self.maintenance_push_handler_func = maintenance_push_handler_func

def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func


class AsyncPushNotificationsParser(Protocol):
"""Protocol defining async RESP3-specific parsing functionality"""
Expand All @@ -325,6 +399,7 @@ class AsyncPushNotificationsParser(Protocol):
invalidation_push_handler_func: Optional[Callable] = None
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
oss_cluster_maint_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None

async def handle_pubsub_push_response(self, response):
"""Handle pubsub push responses asynchronously"""
Expand All @@ -341,6 +416,7 @@ async def handle_push_response(self, response, **kwargs):
_INVALIDATION_MESSAGE,
*_MAINTENANCE_MESSAGES,
_MOVING_MESSAGE,
_SMIGRATED_MESSAGE,
):
return await self.pubsub_push_handler_func(response)

Expand All @@ -365,13 +441,26 @@ async def handle_push_response(self, response, **kwargs):
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][0]
notification = parser_function(response, notification_type)
if msg_type == _SMIGRATING_MESSAGE:
notification = parser_function(response)
else:
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][0]
notification = parser_function(response, notification_type)

if notification is not None:
return await self.maintenance_push_handler_func(notification)
if (
msg_type == _SMIGRATED_MESSAGE
and self.oss_cluster_maint_push_handler_func
):
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]
notification = parser_function(response)
if notification is not None:
return await self.oss_cluster_maint_push_handler_func(notification)
except Exception as e:
logger.error(
"Error handling {} message ({}): {}".format(msg_type, response, e)
Expand All @@ -393,6 +482,9 @@ def set_node_moving_push_handler(self, node_moving_push_handler_func):
def set_maintenance_push_handler(self, maintenance_push_handler_func):
self.maintenance_push_handler_func = maintenance_push_handler_func

def set_oss_cluster_maint_push_handler(self, oss_cluster_maint_push_handler_func):
self.oss_cluster_maint_push_handler_func = oss_cluster_maint_push_handler_func


class _AsyncRESPBase(AsyncBaseParser):
"""Base class for async resp parsing"""
Expand Down
1 change: 1 addition & 0 deletions redis/_parsers/hiredis.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, socket_read_size):
self.pubsub_push_handler_func = self.handle_pubsub_push_response
self.node_moving_push_handler_func = None
self.maintenance_push_handler_func = None
self.oss_cluster_maint_push_handler_func = None
self.invalidation_push_handler_func = None
self._hiredis_PushNotificationType = None

Expand Down
1 change: 1 addition & 0 deletions redis/_parsers/resp3.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self, socket_read_size):
self.pubsub_push_handler_func = self.handle_pubsub_push_response
self.node_moving_push_handler_func = None
self.maintenance_push_handler_func = None
self.oss_cluster_maint_push_handler_func = None
self.invalidation_push_handler_func = None

def handle_pubsub_push_response(self, response):
Expand Down
2 changes: 1 addition & 1 deletion redis/_parsers/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def _read_from_socket(
sock.settimeout(timeout)
try:
while True:
data = self._sock.recv(socket_read_size)
data = sock.recv(socket_read_size)
# an empty string indicates the server shutdown the socket
if isinstance(data, bytes) and len(data) == 0:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
Expand Down
7 changes: 5 additions & 2 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
slot = None
else:
slot = await self._determine_slot(*args)
if not slot:
if slot is None:
command_policies = CommandPolicies()
else:
command_policies = CommandPolicies(
Expand Down Expand Up @@ -1212,6 +1212,9 @@ def __repr__(self) -> str:
def __eq__(self, obj: Any) -> bool:
return isinstance(obj, ClusterNode) and obj.name == self.name

def __hash__(self) -> int:
return hash(self.name)

_DEL_MESSAGE = "Unclosed ClusterNode object"

def __del__(
Expand Down Expand Up @@ -2188,7 +2191,7 @@ async def _execute(
slot = None
else:
slot = await client._determine_slot(*cmd.args)
if not slot:
if slot is None:
command_policies = CommandPolicies()
else:
command_policies = CommandPolicies(
Expand Down
Loading
Loading