From 454f6dee03360146690524be38e0e1724074a2be Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Fri, 16 Sep 2022 13:53:39 +0200 Subject: [PATCH 1/2] Fix #796: connection pool clogging up The reservation count was also increased if the pool was full and the reserveration would never be turned into a new connection. This lead to the pool clogging up after a while. --- neo4j/_async/io/_pool.py | 10 ++++------ neo4j/_sync/io/_pool.py | 10 ++++------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/neo4j/_async/io/_pool.py b/neo4j/_async/io/_pool.py index d0a9acd4f..f61ebd46b 100644 --- a/neo4j/_async/io/_pool.py +++ b/neo4j/_async/io/_pool.py @@ -23,7 +23,6 @@ defaultdict, deque, ) -from contextlib import asynccontextmanager from logging import getLogger from random import choice @@ -161,11 +160,10 @@ async def connection_creator(): connections = self.connections[address] pool_size = (len(connections) + self.connections_reservations[address]) - can_create_new_connection = (infinite_pool_size - or pool_size < max_pool_size) - self.connections_reservations[address] += 1 - if can_create_new_connection: - return connection_creator + if infinite_pool_size or pool_size < max_pool_size: + # there's room for a new connection + self.connections_reservations[address] += 1 + return connection_creator return None async def _acquire(self, address, deadline, liveness_check_timeout): diff --git a/neo4j/_sync/io/_pool.py b/neo4j/_sync/io/_pool.py index d031ad8c8..0a379009e 100644 --- a/neo4j/_sync/io/_pool.py +++ b/neo4j/_sync/io/_pool.py @@ -23,7 +23,6 @@ defaultdict, deque, ) -from contextlib import contextmanager from logging import getLogger from random import choice @@ -161,11 +160,10 @@ def connection_creator(): connections = self.connections[address] pool_size = (len(connections) + self.connections_reservations[address]) - can_create_new_connection = (infinite_pool_size - or pool_size < max_pool_size) - self.connections_reservations[address] += 1 - if can_create_new_connection: - return connection_creator + if infinite_pool_size or pool_size < max_pool_size: + # there's room for a new connection + self.connections_reservations[address] += 1 + return connection_creator return None def _acquire(self, address, deadline, liveness_check_timeout): From 75a77e83b8b6d5322526bd2622dd187b3b8f02f8 Mon Sep 17 00:00:00 2001 From: Robsdedude Date: Mon, 19 Sep 2022 09:45:10 +0200 Subject: [PATCH 2/2] Add unit tests --- tests/unit/async_/io/test_neo4j_pool.py | 31 +++++++++++++++++++++++++ tests/unit/sync/io/test_neo4j_pool.py | 31 +++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/tests/unit/async_/io/test_neo4j_pool.py b/tests/unit/async_/io/test_neo4j_pool.py index b9bbc4e42..e4dec78ba 100644 --- a/tests/unit/async_/io/test_neo4j_pool.py +++ b/tests/unit/async_/io/test_neo4j_pool.py @@ -25,6 +25,7 @@ WRITE_ACCESS, ) from neo4j._async.io import AsyncNeo4jPool +from neo4j._async_compat.util import AsyncUtil from neo4j._conf import ( PoolConfig, RoutingConfig, @@ -437,3 +438,33 @@ async def test_failing_opener_leaves_connections_in_use_alone(opener): with pytest.raises((ServiceUnavailable, SessionExpired)): await pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert not cx1.closed() + + +@mark_async_test +async def test__acquire_new_later_with_room(opener): + config = PoolConfig() + config.max_connection_pool_size = 1 + pool = AsyncNeo4jPool( + opener, config, WorkspaceConfig(), ROUTER_ADDRESS + ) + assert pool.connections_reservations[READER_ADDRESS] == 0 + creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1)) + assert pool.connections_reservations[READER_ADDRESS] == 1 + assert callable(creator) + if AsyncUtil.is_async_code: + assert inspect.iscoroutinefunction(creator) + + +@mark_async_test +async def test__acquire_new_later_without_room(opener): + config = PoolConfig() + config.max_connection_pool_size = 1 + pool = AsyncNeo4jPool( + opener, config, WorkspaceConfig(), ROUTER_ADDRESS + ) + _ = await pool.acquire(READ_ACCESS, 30, "test_db", None, None) + # pool is full now + assert pool.connections_reservations[READER_ADDRESS] == 0 + creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1)) + assert pool.connections_reservations[READER_ADDRESS] == 0 + assert creator is None diff --git a/tests/unit/sync/io/test_neo4j_pool.py b/tests/unit/sync/io/test_neo4j_pool.py index bca9d4441..17507c716 100644 --- a/tests/unit/sync/io/test_neo4j_pool.py +++ b/tests/unit/sync/io/test_neo4j_pool.py @@ -24,6 +24,7 @@ READ_ACCESS, WRITE_ACCESS, ) +from neo4j._async_compat.util import Util from neo4j._conf import ( PoolConfig, RoutingConfig, @@ -437,3 +438,33 @@ def test_failing_opener_leaves_connections_in_use_alone(opener): with pytest.raises((ServiceUnavailable, SessionExpired)): pool.acquire(READ_ACCESS, 30, "test_db", None, None) assert not cx1.closed() + + +@mark_sync_test +def test__acquire_new_later_with_room(opener): + config = PoolConfig() + config.max_connection_pool_size = 1 + pool = Neo4jPool( + opener, config, WorkspaceConfig(), ROUTER_ADDRESS + ) + assert pool.connections_reservations[READER_ADDRESS] == 0 + creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1)) + assert pool.connections_reservations[READER_ADDRESS] == 1 + assert callable(creator) + if Util.is_async_code: + assert inspect.iscoroutinefunction(creator) + + +@mark_sync_test +def test__acquire_new_later_without_room(opener): + config = PoolConfig() + config.max_connection_pool_size = 1 + pool = Neo4jPool( + opener, config, WorkspaceConfig(), ROUTER_ADDRESS + ) + _ = pool.acquire(READ_ACCESS, 30, "test_db", None, None) + # pool is full now + assert pool.connections_reservations[READER_ADDRESS] == 0 + creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1)) + assert pool.connections_reservations[READER_ADDRESS] == 0 + assert creator is None