Skip to content

Fix #796: connection pool clogging up #804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
defaultdict,
deque,
)
from contextlib import asynccontextmanager
from logging import getLogger
from random import choice

Expand Down Expand Up @@ -161,11 +160,10 @@ async def connection_creator():
connections = self.connections[address]
pool_size = (len(connections)
+ self.connections_reservations[address])
can_create_new_connection = (infinite_pool_size
or pool_size < max_pool_size)
self.connections_reservations[address] += 1
if can_create_new_connection:
return connection_creator
if infinite_pool_size or pool_size < max_pool_size:
# there's room for a new connection
self.connections_reservations[address] += 1
return connection_creator
return None

async def _acquire(self, address, deadline, liveness_check_timeout):
Expand Down
10 changes: 4 additions & 6 deletions neo4j/_sync/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
defaultdict,
deque,
)
from contextlib import contextmanager
from logging import getLogger
from random import choice

Expand Down Expand Up @@ -161,11 +160,10 @@ def connection_creator():
connections = self.connections[address]
pool_size = (len(connections)
+ self.connections_reservations[address])
can_create_new_connection = (infinite_pool_size
or pool_size < max_pool_size)
self.connections_reservations[address] += 1
if can_create_new_connection:
return connection_creator
if infinite_pool_size or pool_size < max_pool_size:
# there's room for a new connection
self.connections_reservations[address] += 1
return connection_creator
return None

def _acquire(self, address, deadline, liveness_check_timeout):
Expand Down
31 changes: 31 additions & 0 deletions tests/unit/async_/io/test_neo4j_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
WRITE_ACCESS,
)
from neo4j._async.io import AsyncNeo4jPool
from neo4j._async_compat.util import AsyncUtil
from neo4j._conf import (
PoolConfig,
RoutingConfig,
Expand Down Expand Up @@ -437,3 +438,33 @@ async def test_failing_opener_leaves_connections_in_use_alone(opener):
with pytest.raises((ServiceUnavailable, SessionExpired)):
await pool.acquire(READ_ACCESS, 30, "test_db", None, None)
assert not cx1.closed()


@mark_async_test
async def test__acquire_new_later_with_room(opener):
config = PoolConfig()
config.max_connection_pool_size = 1
pool = AsyncNeo4jPool(
opener, config, WorkspaceConfig(), ROUTER_ADDRESS
)
assert pool.connections_reservations[READER_ADDRESS] == 0
creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1))
assert pool.connections_reservations[READER_ADDRESS] == 1
assert callable(creator)
if AsyncUtil.is_async_code:
assert inspect.iscoroutinefunction(creator)


@mark_async_test
async def test__acquire_new_later_without_room(opener):
config = PoolConfig()
config.max_connection_pool_size = 1
pool = AsyncNeo4jPool(
opener, config, WorkspaceConfig(), ROUTER_ADDRESS
)
_ = await pool.acquire(READ_ACCESS, 30, "test_db", None, None)
# pool is full now
assert pool.connections_reservations[READER_ADDRESS] == 0
creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1))
assert pool.connections_reservations[READER_ADDRESS] == 0
assert creator is None
31 changes: 31 additions & 0 deletions tests/unit/sync/io/test_neo4j_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
READ_ACCESS,
WRITE_ACCESS,
)
from neo4j._async_compat.util import Util
from neo4j._conf import (
PoolConfig,
RoutingConfig,
Expand Down Expand Up @@ -437,3 +438,33 @@ def test_failing_opener_leaves_connections_in_use_alone(opener):
with pytest.raises((ServiceUnavailable, SessionExpired)):
pool.acquire(READ_ACCESS, 30, "test_db", None, None)
assert not cx1.closed()


@mark_sync_test
def test__acquire_new_later_with_room(opener):
config = PoolConfig()
config.max_connection_pool_size = 1
pool = Neo4jPool(
opener, config, WorkspaceConfig(), ROUTER_ADDRESS
)
assert pool.connections_reservations[READER_ADDRESS] == 0
creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1))
assert pool.connections_reservations[READER_ADDRESS] == 1
assert callable(creator)
if Util.is_async_code:
assert inspect.iscoroutinefunction(creator)


@mark_sync_test
def test__acquire_new_later_without_room(opener):
config = PoolConfig()
config.max_connection_pool_size = 1
pool = Neo4jPool(
opener, config, WorkspaceConfig(), ROUTER_ADDRESS
)
_ = pool.acquire(READ_ACCESS, 30, "test_db", None, None)
# pool is full now
assert pool.connections_reservations[READER_ADDRESS] == 0
creator = pool._acquire_new_later(READER_ADDRESS, Deadline(1))
assert pool.connections_reservations[READER_ADDRESS] == 0
assert creator is None