Skip to content

Commit 06f0767

Browse files
authored
New verify_connectivity + add get_server_info (#654)
* New `verify_connectivity` + add `get_server_info` * Remove stub tests All stub tests were ported to TestKit, except a hand full that tests the Python driver specific behavior of `verify_connectivity` which has been unified and tests have been added to TestKit.
1 parent 88847cc commit 06f0767

File tree

156 files changed

+882
-2246
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

156 files changed

+882
-2246
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@
7878
It now raises a `ResultConsumedError`.
7979
- New method `Result.closed()` can be used to check for this condition if
8080
necessary.
81+
- `driver.verify_connectivity()`
82+
- All keyword arguments have been deprecated (they were experimental).
83+
They are now ignored and will be removed in a future release.
84+
- The undocumented return value has been removed. If you need information
85+
about the remote server, use `driver.get_server_info()` instead.
8186

8287

8388
## Version 4.4

bin/make-unasync

+1
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ def apply_unasync(files):
213213
additional_test_replacements = {
214214
"_async": "_sync",
215215
"mark_async_test": "mark_sync_test",
216+
"assert_awaited_once": "assert_called_once",
216217
}
217218
additional_testkit_backend_replacements = {}
218219
rules = [

docs/source/api.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ This object holds the details required to establish connections with a Neo4j dat
145145
Closing a driver will immediately shut down all connections in the pool.
146146

147147
.. autoclass:: neo4j.Driver()
148-
:members: session, encrypted, close
148+
:members: session, encrypted, close, verify_connectivity, get_server_info
149149

150150

151151
.. _driver-configuration-ref:

docs/source/async_api.rst

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Async API Documentation
1010
This means everything documented on this page might be removed or change
1111
its API at any time (including in patch releases).
1212

13+
.. versionadded:: 5.0
14+
1315
******************
1416
AsyncGraphDatabase
1517
******************
@@ -126,7 +128,7 @@ This object holds the details required to establish connections with a Neo4j dat
126128
Closing a driver will immediately shut down all connections in the pool.
127129

128130
.. autoclass:: neo4j.AsyncDriver()
129-
:members: session, encrypted, close
131+
:members: session, encrypted, close, verify_connectivity, get_server_info
130132

131133

132134
.. _async-driver-configuration-ref:

neo4j/_async/driver.py

+55-56
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
SessionConfig,
3232
WorkspaceConfig,
3333
)
34+
from ..exceptions import (
35+
ServiceUnavailable,
36+
SessionExpired,
37+
)
3438
from ..meta import (
3539
deprecation_warn,
3640
experimental,
@@ -236,9 +240,11 @@ class AsyncDriver:
236240
#: Flag if the driver has been closed
237241
_closed = False
238242

239-
def __init__(self, pool):
243+
def __init__(self, pool, default_workspace_config):
240244
assert pool is not None
245+
assert default_workspace_config is not None
241246
self._pool = pool
247+
self._default_workspace_config = default_workspace_config
242248

243249
async def __aenter__(self):
244250
return self
@@ -285,17 +291,56 @@ async def close(self):
285291
await self._pool.close()
286292
self._closed = True
287293

288-
@experimental("The configuration may change in the future.")
294+
# TODO: 6.0 - remove config argument
289295
async def verify_connectivity(self, **config):
290-
""" This verifies if the driver can connect to a remote server or a cluster
291-
by establishing a network connection with the remote and possibly exchanging
292-
a few data before closing the connection. It throws exception if fails to connect.
296+
"""Verify that the driver can establish a connection to the server.
297+
298+
This verifies if the driver can establish a reading connection to a
299+
remote server or a cluster. Some data will be exchanged.
293300
294-
Use the exception to further understand the cause of the connectivity problem.
301+
.. note::
302+
Even if this method raises an exception, the driver still needs to
303+
be closed via :meth:`close` to free up all resources.
295304
296-
Note: Even if this method throws an exception, the driver still need to be closed via close() to free up all resources.
305+
:raises DriverError: if the driver cannot connect to the remote.
306+
Use the exception to further understand the cause of the
307+
connectivity problem.
308+
309+
.. versionchanged:: 5.0 the config parameters will be removed in
310+
version 6 0. It has no effect starting in version 5.0.
297311
"""
298-
raise NotImplementedError
312+
if config:
313+
deprecation_warn(
314+
"verify_connectivity() will not accept any configuration "
315+
"parameters starting with version 6.0."
316+
)
317+
318+
await self.get_server_info()
319+
320+
async def get_server_info(self):
321+
"""Get information about the connected Neo4j server.
322+
323+
Try to establish a working read connection to the remote server or a
324+
member of a cluster and exchange some data. Then return the contacted
325+
server's information.
326+
327+
In a cluster, there is no guarantee about which server will be
328+
contacted.
329+
330+
.. note::
331+
Even if this method raises an exception, the driver still needs to
332+
be closed via :meth:`close` to free up all resources.
333+
334+
:rtype: ServerInfo
335+
336+
:raises DriverError: if the driver cannot connect to the remote.
337+
Use the exception to further understand the cause of the
338+
connectivity problem.
339+
340+
.. versionadded:: 5.0
341+
"""
342+
async with self.session() as session:
343+
return await session._get_server_info()
299344

300345
@experimental("Feature support query, based on Bolt Protocol Version and Neo4j Server Version will change in the future.")
301346
async def supports_multi_db(self):
@@ -339,7 +384,7 @@ def open(cls, target, *, auth=None, **config):
339384

340385
def __init__(self, pool, default_workspace_config):
341386
_Direct.__init__(self, pool.address)
342-
AsyncDriver.__init__(self, pool)
387+
AsyncDriver.__init__(self, pool, default_workspace_config)
343388
self._default_workspace_config = default_workspace_config
344389

345390
def session(self, **config):
@@ -354,17 +399,6 @@ def session(self, **config):
354399
SessionConfig.consume(config) # Consume the config
355400
return AsyncSession(self._pool, session_config)
356401

357-
@experimental("The configuration may change in the future.")
358-
async def verify_connectivity(self, **config):
359-
server_agent = None
360-
config["fetch_size"] = -1
361-
async with self.session(**config) as session:
362-
result = await session.run("RETURN 1 AS x")
363-
value = await result.single().value()
364-
summary = await result.consume()
365-
server_agent = summary.server.agent
366-
return server_agent
367-
368402

369403
class AsyncNeo4jDriver(_Routing, AsyncDriver):
370404
""":class:`.AsyncNeo4jDriver` is instantiated for ``neo4j`` URIs. The
@@ -387,45 +421,10 @@ def open(cls, *targets, auth=None, routing_context=None, **config):
387421

388422
def __init__(self, pool, default_workspace_config):
389423
_Routing.__init__(self, pool.get_default_database_initial_router_addresses())
390-
AsyncDriver.__init__(self, pool)
391-
self._default_workspace_config = default_workspace_config
424+
AsyncDriver.__init__(self, pool, default_workspace_config)
392425

393426
def session(self, **config):
394427
from .work import AsyncSession
395428
session_config = SessionConfig(self._default_workspace_config, config)
396429
SessionConfig.consume(config) # Consume the config
397430
return AsyncSession(self._pool, session_config)
398-
399-
@experimental("The configuration may change in the future.")
400-
async def verify_connectivity(self, **config):
401-
"""
402-
:raise ServiceUnavailable: raised if the server does not support routing or if routing support is broken.
403-
"""
404-
# TODO: Improve and update Stub Test Server to be able to test.
405-
return await self._verify_routing_connectivity()
406-
407-
async def _verify_routing_connectivity(self):
408-
from ..exceptions import (
409-
Neo4jError,
410-
ServiceUnavailable,
411-
SessionExpired,
412-
)
413-
414-
table = self._pool.get_routing_table_for_default_database()
415-
routing_info = {}
416-
for ix in list(table.routers):
417-
try:
418-
routing_info[ix] = await self._pool.fetch_routing_info(
419-
address=table.routers[0],
420-
database=self._default_workspace_config.database,
421-
imp_user=self._default_workspace_config.impersonated_user,
422-
bookmarks=None,
423-
timeout=self._default_workspace_config
424-
.connection_acquisition_timeout
425-
)
426-
except (ServiceUnavailable, SessionExpired, Neo4jError):
427-
routing_info[ix] = None
428-
for key, val in routing_info.items():
429-
if val is not None:
430-
return routing_info
431-
raise ServiceUnavailable("Could not connect to any routing servers.")

neo4j/_async/io/_bolt.py

+46-6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ class AsyncBolt:
7272
# The socket
7373
in_use = False
7474

75+
# When the connection was last put back into the pool
76+
idle_since = float("-inf")
77+
7578
# The socket
7679
_closed = False
7780

@@ -104,6 +107,7 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *,
104107
self._max_connection_lifetime = max_connection_lifetime
105108
self._creation_timestamp = perf_counter()
106109
self.routing_context = routing_context
110+
self.idle_since = perf_counter()
107111

108112
# Determine the user agent
109113
if user_agent:
@@ -456,29 +460,55 @@ async def _send_all(self):
456460
except OSError as error:
457461
await self._set_defunct_write(error)
458462
self.outbox.clear()
463+
self.idle_since = perf_counter()
459464

460465
async def send_all(self):
461466
""" Send all queued messages to the server.
462467
"""
463468
if self.closed():
464-
raise ServiceUnavailable("Failed to write to closed connection {!r} ({!r})".format(
465-
self.unresolved_address, self.server_info.address))
466-
469+
raise ServiceUnavailable(
470+
"Failed to write to closed connection {!r} ({!r})".format(
471+
self.unresolved_address, self.server_info.address
472+
)
473+
)
467474
if self.defunct():
468-
raise ServiceUnavailable("Failed to write to defunct connection {!r} ({!r})".format(
469-
self.unresolved_address, self.server_info.address))
475+
raise ServiceUnavailable(
476+
"Failed to write to defunct connection {!r} ({!r})".format(
477+
self.unresolved_address, self.server_info.address
478+
)
479+
)
470480

471481
await self._send_all()
472482

473483
@abc.abstractmethod
474-
async def fetch_message(self):
484+
async def _fetch_message(self):
475485
""" Receive at most one message from the server, if available.
476486
477487
:return: 2-tuple of number of detail messages and number of summary
478488
messages fetched
479489
"""
480490
pass
481491

492+
async def fetch_message(self):
493+
if self._closed:
494+
raise ServiceUnavailable(
495+
"Failed to read from closed connection {!r} ({!r})".format(
496+
self.unresolved_address, self.server_info.address
497+
)
498+
)
499+
if self._defunct:
500+
raise ServiceUnavailable(
501+
"Failed to read from defunct connection {!r} ({!r})".format(
502+
self.unresolved_address, self.server_info.address
503+
)
504+
)
505+
if not self.responses:
506+
return 0, 0
507+
508+
res = await self._fetch_message()
509+
self.idle_since = perf_counter()
510+
return res
511+
482512
async def fetch_all(self):
483513
""" Fetch all outstanding messages.
484514
@@ -568,5 +598,15 @@ def closed(self):
568598
def defunct(self):
569599
pass
570600

601+
def is_idle_for(self, timeout):
602+
"""Check if connection has been idle for at least the given timeout.
603+
604+
:param timeout: timeout in seconds
605+
:type timeout: float
606+
607+
:rtype: bool
608+
"""
609+
return perf_counter() - self.idle_since > timeout
610+
571611

572612
AsyncBoltSocket.Bolt = AsyncBolt

neo4j/_async/io/_bolt3.py

+1-12
Original file line numberDiff line numberDiff line change
@@ -314,23 +314,12 @@ def fail(metadata):
314314
await self.send_all()
315315
await self.fetch_all()
316316

317-
async def fetch_message(self):
317+
async def _fetch_message(self):
318318
""" Receive at most one message from the server, if available.
319319
320320
:return: 2-tuple of number of detail messages and number of summary
321321
messages fetched
322322
"""
323-
if self._closed:
324-
raise ServiceUnavailable("Failed to read from closed connection {!r} ({!r})".format(
325-
self.unresolved_address, self.server_info.address))
326-
327-
if self._defunct:
328-
raise ServiceUnavailable("Failed to read from defunct connection {!r} ({!r})".format(
329-
self.unresolved_address, self.server_info.address))
330-
331-
if not self.responses:
332-
return 0, 0
333-
334323
# Receive exactly one message
335324
details, summary_signature, summary_metadata = \
336325
await AsyncUtil.next(self.inbox)

neo4j/_async/io/_bolt4.py

+1-12
Original file line numberDiff line numberDiff line change
@@ -265,23 +265,12 @@ def fail(metadata):
265265
await self.send_all()
266266
await self.fetch_all()
267267

268-
async def fetch_message(self):
268+
async def _fetch_message(self):
269269
""" Receive at most one message from the server, if available.
270270
271271
:return: 2-tuple of number of detail messages and number of summary
272272
messages fetched
273273
"""
274-
if self._closed:
275-
raise ServiceUnavailable("Failed to read from closed connection {!r} ({!r})".format(
276-
self.unresolved_address, self.server_info.address))
277-
278-
if self._defunct:
279-
raise ServiceUnavailable("Failed to read from defunct connection {!r} ({!r})".format(
280-
self.unresolved_address, self.server_info.address))
281-
282-
if not self.responses:
283-
return 0, 0
284-
285274
# Receive exactly one message
286275
details, summary_signature, summary_metadata = \
287276
await AsyncUtil.next(self.inbox)

0 commit comments

Comments
 (0)