Skip to content

Commit 188bd49

Browse files
zakafzach-iee
authored andcommitted
release already acquired connections on ClusterPipeline, when get_connection raises an exception
1 parent 1a7d474 commit 188bd49

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

redis/cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2143,6 +2143,8 @@ def _send_cluster_commands(
21432143
try:
21442144
connection = get_connection(redis_node, c.args)
21452145
except ConnectionError:
2146+
for n in nodes.values():
2147+
n.connection_pool.release(n.connection)
21462148
# Connection retries are being handled in the node's
21472149
# Retry object. Reinitialize the node -> slot table.
21482150
self.nodes_manager.initialize()

tests/test_cluster.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from unittest.mock import DEFAULT, Mock, call, patch
1111

1212
import pytest
13+
import redis
1314
from redis import Redis
1415
from redis._parsers import CommandsParser
1516
from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
@@ -3250,6 +3251,32 @@ def raise_ask_error():
32503251
assert ask_node.redis_connection.connection.read_response.called
32513252
assert res == ["MOCK_OK"]
32523253

3254+
def test_return_previously_acquired_connections(self, r):
3255+
# in order to ensure that a pipeline will make use of connections
3256+
# from different nodes
3257+
assert r.keyslot("a") != r.keyslot("b")
3258+
3259+
orig_func = redis.cluster.get_connection
3260+
with patch("redis.cluster.get_connection") as get_connection:
3261+
3262+
def raise_error(target_node, *args, **kwargs):
3263+
if get_connection.call_count == 2:
3264+
raise ConnectionError("mocked error")
3265+
else:
3266+
return orig_func(target_node, *args, **kwargs)
3267+
3268+
get_connection.side_effect = raise_error
3269+
3270+
r.pipeline().get("a").get("b").execute()
3271+
3272+
# there should have been two get_connections per execution and
3273+
# two executions due to exception raised in the first execution
3274+
assert get_connection.call_count == 4
3275+
for cluster_node in r.nodes_manager.nodes_cache.values():
3276+
connection_pool = cluster_node.redis_connection.connection_pool
3277+
num_of_conns = len(connection_pool._available_connections)
3278+
assert num_of_conns == connection_pool._created_connections
3279+
32533280
def test_empty_stack(self, r):
32543281
"""
32553282
If pipeline is executed with no commands it should

0 commit comments

Comments
 (0)