Skip to content

Commit 709a91e

Browse files
committed
Check Result scope and raise ResultConsumedError if appropriate
Results are tied to transactions (except auto-commit transactions). When the transaction ends, the result becomes useless. Raising instead of silently ignoring this fact will help developers to find potential bugs faster. After consuming a Result, there can never be records left. There is meaning in trying to obtain them. Hence, we raise a ResultError to make the user aware of potentially wrong code.
1 parent 0f10fc4 commit 709a91e

File tree

10 files changed

+223
-37
lines changed

10 files changed

+223
-37
lines changed

CHANGELOG.md

+14-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
- Python 3.10 support added
66
- Python 3.6 support has been dropped.
77
- `Result`, `Session`, and `Transaction` can no longer be imported from
8-
`neo4j.work`. They should've been imported from `neo4j` all along.
8+
`neo4j.work`. They should've been imported from `neo4j` all along.
9+
Remark: It's recommended to import everything needed directly from `noe4j`,
10+
not its submodules or subpackages.
911
- Experimental pipelines feature has been removed.
1012
- Experimental async driver has been added.
1113
- `ResultSummary.server.version_info` has been removed.
@@ -65,6 +67,17 @@
6567
destructor will ever be called. A `ResourceWarning` is emitted instead.
6668
Make sure to configure Python to output those warnings when developing your
6769
application locally (it does not by default).
70+
- Result scope:
71+
- Records of Results cannot be accessed (`peek`, `single`, `iter`, ...)
72+
after their owning transaction has been closed, committed, or rolled back.
73+
Previously, this would yield undefined behavior.
74+
It now raises a `ResultConsumedError`.
75+
- Records of Results cannot be accessed (`peek`, `single`, `iter`, ...)
76+
after the Result has been consumed (`Result.consume()`).
77+
Previously, this would always yield no records.
78+
It now raises a `ResultConsumedError`.
79+
- New method `Result.closed()` can be used to check for this condition if
80+
necessary.
6881

6982

7083
## Version 4.4

docs/source/api.rst

+2
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,8 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n
809809

810810
.. automethod:: data
811811

812+
.. automethod:: closed
813+
812814
See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
813815

814816

docs/source/async_api.rst

+2
Original file line numberDiff line numberDiff line change
@@ -501,4 +501,6 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla
501501

502502
.. automethod:: data
503503

504+
.. automethod:: closed
505+
504506
See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.

neo4j/_async/work/result.py

+88-8
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,25 @@
2020

2121
from ..._async_compat.util import AsyncUtil
2222
from ...data import DataDehydrator
23-
from ...exceptions import ResultNotSingleError
23+
from ...exceptions import (
24+
ResultConsumedError,
25+
ResultNotSingleError,
26+
)
2427
from ...work import ResultSummary
2528
from ..io import ConnectionErrorHandler
2629

2730

31+
_RESULT_OUT_OF_SCOPE_ERROR = (
32+
"The result is out of scope. The associated transaction "
33+
"has been closed. Results can only be used while the "
34+
"transaction is open."
35+
)
36+
_RESULT_CONSUMED_ERROR = (
37+
"The result has been consumed. Fetch all needed records before calling "
38+
"Result.consume()."
39+
)
40+
41+
2842
class AsyncResult:
2943
"""A handler for the result of Cypher query execution. Instances
3044
of this class are typically constructed and returned by
@@ -52,7 +66,11 @@ def __init__(self, connection, hydrant, fetch_size, on_closed,
5266
# there ar more records available to pull from the server
5367
self._has_more = False
5468
# the result has been fully iterated or consumed
55-
self._closed = False
69+
self._exhausted = False
70+
# the result has been consumed
71+
self._consumed = False
72+
# the result has been closed as a result of closing the transaction
73+
self._out_of_scope = False
5674

5775
@property
5876
def _qid(self):
@@ -194,7 +212,11 @@ async def __aiter__(self):
194212
self._pull()
195213
await self._connection.send_all()
196214

197-
self._closed = True
215+
self._exhausted = True
216+
if self._out_of_scope:
217+
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
218+
if self._consumed:
219+
raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR)
198220

199221
async def __anext__(self):
200222
return await self.__aiter__().__anext__()
@@ -203,7 +225,7 @@ async def _attach(self):
203225
"""Sets the Result object in an attached state by fetching messages from
204226
the connection to the buffer.
205227
"""
206-
if self._closed is False:
228+
if self._exhausted is False:
207229
while self._attached is False:
208230
await self._connection.fetch_message()
209231

@@ -215,14 +237,18 @@ async def _buffer(self, n=None):
215237
Might ent up with fewer records in the buffer if there are not enough
216238
records available.
217239
"""
240+
if self._out_of_scope:
241+
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
242+
if self._consumed:
243+
raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR)
218244
if n is not None and len(self._record_buffer) >= n:
219245
return
220246
record_buffer = deque()
221247
async for record in self:
222248
record_buffer.append(record)
223249
if n is not None and len(record_buffer) >= n:
224250
break
225-
self._closed = False
251+
self._exhausted = False
226252
if n is None:
227253
self._record_buffer = record_buffer
228254
else:
@@ -260,6 +286,14 @@ def keys(self):
260286
"""
261287
return self._keys
262288

289+
async def _tx_end(self):
290+
# Handle closure of the associated transaction.
291+
#
292+
# This will consume the result and mark it at out of scope.
293+
# Subsequent calls to `next` will raise a ResultConsumedError.
294+
await self.consume()
295+
self._out_of_scope = True
296+
263297
async def consume(self):
264298
"""Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.
265299
@@ -296,12 +330,14 @@ async def get_two_tx(tx):
296330
297331
:returns: The :class:`neo4j.ResultSummary` for this result
298332
"""
299-
if self._closed is False:
333+
if self._exhausted is False:
300334
self._discarding = True
301335
async for _ in self:
302336
pass
303337

304-
return self._obtain_summary()
338+
summary = self._obtain_summary()
339+
self._consumed = True
340+
return summary
305341

306342
async def single(self):
307343
"""Obtain the next and only remaining record from this result if available else return None.
@@ -311,16 +347,21 @@ async def single(self):
311347
the first of these is still returned.
312348
313349
:returns: the next :class:`neo4j.AsyncRecord`.
314-
:raises: ResultNotSingleError if not exactly one record is available.
350+
351+
:raises ResultNotSingleError: if not exactly one record is available.
352+
:raises ResultConsumedError: if the transaction from which this result was
353+
obtained has been closed.
315354
"""
316355
await self._buffer(2)
317356
if not self._record_buffer:
318357
raise ResultNotSingleError(
358+
self,
319359
"No records found. "
320360
"Make sure your query returns exactly one record."
321361
)
322362
elif len(self._record_buffer) > 1:
323363
raise ResultNotSingleError(
364+
self,
324365
"More than one record found. "
325366
"Make sure your query returns exactly one record."
326367
)
@@ -331,6 +372,10 @@ async def peek(self):
331372
This leaves the record in the buffer for further processing.
332373
333374
:returns: the next :class:`.Record` or :const:`None` if none remain
375+
376+
:raises ResultConsumedError: if the transaction from which this result
377+
was obtained has been closed or the Result has been explicitly
378+
consumed.
334379
"""
335380
await self._buffer(1)
336381
if self._record_buffer:
@@ -343,6 +388,10 @@ async def graph(self):
343388
344389
:returns: a result graph
345390
:rtype: :class:`neo4j.graph.Graph`
391+
392+
:raises ResultConsumedError: if the transaction from which this result
393+
was obtained has been closed or the Result has been explicitly
394+
consumed.
346395
"""
347396
await self._buffer_all()
348397
return self._hydrant.graph
@@ -354,8 +403,13 @@ async def value(self, key=0, default=None):
354403
355404
:param key: field to return for each remaining record. Obtain a single value from the record by index or key.
356405
:param default: default value, used if the index of key is unavailable
406+
357407
:returns: list of individual values
358408
:rtype: list
409+
410+
:raises ResultConsumedError: if the transaction from which this result
411+
was obtained has been closed or the Result has been explicitly
412+
consumed.
359413
"""
360414
return [record.value(key, default) async for record in self]
361415

@@ -365,8 +419,13 @@ async def values(self, *keys):
365419
See :class:`neo4j.AsyncRecord.values`
366420
367421
:param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
422+
368423
:returns: list of values lists
369424
:rtype: list
425+
426+
:raises ResultConsumedError: if the transaction from which this result
427+
was obtained has been closed or the Result has been explicitly
428+
consumed.
370429
"""
371430
return [record.values(*keys) async for record in self]
372431

@@ -376,7 +435,28 @@ async def data(self, *keys):
376435
See :class:`neo4j.AsyncRecord.data`
377436
378437
:param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
438+
379439
:returns: list of dictionaries
380440
:rtype: list
441+
442+
:raises ResultConsumedError: if the transaction from which this result was
443+
obtained has been closed.
381444
"""
382445
return [record.data(*keys) async for record in self]
446+
447+
def closed(self):
448+
"""Return True if the result has been closed.
449+
450+
When a result gets consumed :meth:`consume` or the transaction that
451+
owns the result gets closed (committed, rolled back, closed), the
452+
result cannot be used to acquire further records.
453+
454+
In such case, all methods that need to access the Result's records,
455+
will raise a :exc:`ResultConsumedError` when called.
456+
457+
:returns: whether the result is closed.
458+
:rtype: bool
459+
460+
.. versionadded:: 5.0
461+
"""
462+
return self._out_of_scope or self._consumed

neo4j/_async/work/transaction.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async def _error_handler(self, exc):
7878

7979
async def _consume_results(self):
8080
for result in self._results:
81-
await result.consume()
81+
await result._tx_end()
8282
self._results = []
8383

8484
async def run(self, query, parameters=None, **kwparameters):

0 commit comments

Comments
 (0)