Skip to content

Commit c3f360b

Browse files
committed
Don't perform blocking connect inside the BlockingConnectionQueue Condition variable.
1 parent 054caf3 commit c3f360b

File tree

1 file changed

+22
-9
lines changed

1 file changed

+22
-9
lines changed

redis/asyncio/connection.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,21 +1027,25 @@ def can_get_connection(self) -> bool:
10271027
)
10281028

10291029
async def get_connection(self, command_name, *keys, **options):
1030-
"""Get a connection from the pool"""
1030+
"""Get a connected connection from the pool"""
1031+
connection = self.get_available_connection()
1032+
try:
1033+
await self.ensure_connection(connection)
1034+
except BaseException:
1035+
await self.release(connection)
1036+
raise
1037+
1038+
return connection
1039+
1040+
def get_available_connection(self):
1041+
"""Get a connection from the pool, without making sure it is connected"""
10311042
try:
10321043
connection = self._available_connections.pop()
10331044
except IndexError:
10341045
if len(self._in_use_connections) >= self.max_connections:
10351046
raise ConnectionError("Too many connections") from None
10361047
connection = self.make_connection()
10371048
self._in_use_connections.add(connection)
1038-
1039-
try:
1040-
await self.ensure_connection(connection)
1041-
except BaseException:
1042-
await self.release(connection)
1043-
raise
1044-
10451049
return connection
10461050

10471051
def get_encoder(self):
@@ -1169,10 +1173,19 @@ async def get_connection(self, command_name, *keys, **options):
11691173
async with async_timeout(self.timeout):
11701174
async with self._condition:
11711175
await self._condition.wait_for(self.can_get_connection)
1172-
return await super().get_connection(command_name, *keys, **options)
1176+
connection = super().get_available_connection()
1177+
11731178
except asyncio.TimeoutError as err:
11741179
raise ConnectionError("No connection available.") from err
11751180

1181+
# We now perform the connection check outside of the lock.
1182+
try:
1183+
await self.ensure_connection(connection)
1184+
return connection
1185+
except BaseException:
1186+
await self.release(connection)
1187+
raise
1188+
11761189
async def release(self, connection: AbstractConnection):
11771190
"""Releases the connection back to the pool."""
11781191
async with self._condition:

0 commit comments

Comments
 (0)