Skip to content

Commit 8e36f83

Browse files
zakafvladvildanov
authored andcommitted
release already acquired connections on ClusterPipeline, when get_connection raises an exception (#3133)
Signed-off-by: zach.lee <[email protected]>
1 parent 5261881 commit 8e36f83

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

redis/cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2143,6 +2143,8 @@ def _send_cluster_commands(
21432143
try:
21442144
connection = get_connection(redis_node, c.args)
21452145
except ConnectionError:
2146+
for n in nodes.values():
2147+
n.connection_pool.release(n.connection)
21462148
# Connection retries are being handled in the node's
21472149
# Retry object. Reinitialize the node -> slot table.
21482150
self.nodes_manager.initialize()

tests/test_cluster.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from unittest.mock import DEFAULT, Mock, call, patch
1111

1212
import pytest
13+
import redis
1314
from redis import Redis
1415
from redis._parsers import CommandsParser
1516
from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
@@ -3250,6 +3251,31 @@ def raise_ask_error():
32503251
assert ask_node.redis_connection.connection.read_response.called
32513252
assert res == ["MOCK_OK"]
32523253

3254+
def test_return_previously_acquired_connections(self, r):
3255+
# in order to ensure that a pipeline will make use of connections
3256+
# from different nodes
3257+
assert r.keyslot("a") != r.keyslot("b")
3258+
3259+
orig_func = redis.cluster.get_connection
3260+
with patch("redis.cluster.get_connection") as get_connection:
3261+
3262+
def raise_error(target_node, *args, **kwargs):
3263+
if get_connection.call_count == 2:
3264+
raise ConnectionError("mocked error")
3265+
else:
3266+
return orig_func(target_node, *args, **kwargs)
3267+
3268+
get_connection.side_effect = raise_error
3269+
3270+
r.pipeline().get("a").get("b").execute()
3271+
3272+
# 4 = 2 get_connections per execution * 2 executions
3273+
assert get_connection.call_count == 4
3274+
for cluster_node in r.nodes_manager.nodes_cache.values():
3275+
connection_pool = cluster_node.redis_connection.connection_pool
3276+
num_of_conns = len(connection_pool._available_connections)
3277+
assert num_of_conns == connection_pool._created_connections
3278+
32533279
def test_empty_stack(self, r):
32543280
"""
32553281
If pipeline is executed with no commands it should

0 commit comments

Comments
 (0)