Skip to content

Commit eff0505

Browse files
zakafzach-iee
authored andcommitted
release already acquired connections on ClusterPipeline, when get_connection raises an exception
1 parent 1a7d474 commit eff0505

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

redis/cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2143,6 +2143,8 @@ def _send_cluster_commands(
21432143
try:
21442144
connection = get_connection(redis_node, c.args)
21452145
except ConnectionError:
2146+
for n in nodes.values():
2147+
n.connection_pool.release(n.connection)
21462148
# Connection retries are being handled in the node's
21472149
# Retry object. Reinitialize the node -> slot table.
21482150
self.nodes_manager.initialize()

tests/test_cluster.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from unittest.mock import DEFAULT, Mock, call, patch
1111

1212
import pytest
13+
14+
import redis
1315
from redis import Redis
1416
from redis._parsers import CommandsParser
1517
from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
@@ -3250,6 +3252,32 @@ def raise_ask_error():
32503252
assert ask_node.redis_connection.connection.read_response.called
32513253
assert res == ["MOCK_OK"]
32523254

3255+
def test_return_previously_acquired_connections(self, r):
3256+
# in order to ensure that a pipeline will make use of connections
3257+
# from different nodes
3258+
assert r.keyslot("a") != r.keyslot("b")
3259+
3260+
orig_func = redis.cluster.get_connection
3261+
with patch("redis.cluster.get_connection") as get_connection:
3262+
3263+
def raise_error(target_node, *args, **kwargs):
3264+
if get_connection.call_count == 2:
3265+
raise ConnectionError("mocked error")
3266+
else:
3267+
return orig_func(target_node, *args, **kwargs)
3268+
3269+
get_connection.side_effect = raise_error
3270+
3271+
r.pipeline().get("a").get("b").execute()
3272+
3273+
# there should have been two get_connections per execution and
3274+
# two executions due to exception raised in the first execution
3275+
assert get_connection.call_count == 4
3276+
for cluster_node in r.nodes_manager.nodes_cache.values():
3277+
connection_pool = cluster_node.redis_connection.connection_pool
3278+
num_of_conns = len(connection_pool._available_connections)
3279+
assert num_of_conns == connection_pool._created_connections
3280+
32533281
def test_empty_stack(self, r):
32543282
"""
32553283
If pipeline is executed with no commands it should

0 commit comments

Comments
 (0)