Skip to content

Commit 88847cc

Browse files
committed
Check Result scope and raise ResultConsumedError if appropriate (#652)
Results are tied to transactions (except auto-commit transactions). When the transaction ends, the result becomes useless. Raising instead of silently ignoring this fact will help developers to find potential bugs faster and avoid surprising and hard to understand behavior. After consuming a Result, there can never be records left. There is no meaning in trying to obtain them. Hence, we raise a `ResultConsumedError` to make the user aware of likely wrong code.
1 parent 0f10fc4 commit 88847cc

File tree

10 files changed

+223
-37
lines changed

10 files changed

+223
-37
lines changed

CHANGELOG.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
- Python 3.10 support added
66
- Python 3.6 support has been dropped.
77
- `Result`, `Session`, and `Transaction` can no longer be imported from
8-
`neo4j.work`. They should've been imported from `neo4j` all along.
8+
`neo4j.work`. They should've been imported from `neo4j` all along.
9+
Remark: It's recommended to import everything needed directly from `noe4j`,
10+
not its submodules or subpackages.
911
- Experimental pipelines feature has been removed.
1012
- Experimental async driver has been added.
1113
- `ResultSummary.server.version_info` has been removed.
@@ -65,6 +67,17 @@
6567
destructor will ever be called. A `ResourceWarning` is emitted instead.
6668
Make sure to configure Python to output those warnings when developing your
6769
application locally (it does not by default).
70+
- Result scope:
71+
- Records of Results cannot be accessed (`peek`, `single`, `iter`, ...)
72+
after their owning transaction has been closed, committed, or rolled back.
73+
Previously, this would yield undefined behavior.
74+
It now raises a `ResultConsumedError`.
75+
- Records of Results cannot be accessed (`peek`, `single`, `iter`, ...)
76+
after the Result has been consumed (`Result.consume()`).
77+
Previously, this would always yield no records.
78+
It now raises a `ResultConsumedError`.
79+
- New method `Result.closed()` can be used to check for this condition if
80+
necessary.
6881

6982

7083
## Version 4.4

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,8 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n
809809

810810
.. automethod:: data
811811

812+
.. automethod:: closed
813+
812814
See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
813815

814816

docs/source/async_api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,4 +501,6 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla
501501

502502
.. automethod:: data
503503

504+
.. automethod:: closed
505+
504506
See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.

neo4j/_async/work/result.py

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,25 @@
2020

2121
from ..._async_compat.util import AsyncUtil
2222
from ...data import DataDehydrator
23-
from ...exceptions import ResultNotSingleError
23+
from ...exceptions import (
24+
ResultConsumedError,
25+
ResultNotSingleError,
26+
)
2427
from ...work import ResultSummary
2528
from ..io import ConnectionErrorHandler
2629

2730

31+
_RESULT_OUT_OF_SCOPE_ERROR = (
32+
"The result is out of scope. The associated transaction "
33+
"has been closed. Results can only be used while the "
34+
"transaction is open."
35+
)
36+
_RESULT_CONSUMED_ERROR = (
37+
"The result has been consumed. Fetch all needed records before calling "
38+
"Result.consume()."
39+
)
40+
41+
2842
class AsyncResult:
2943
"""A handler for the result of Cypher query execution. Instances
3044
of this class are typically constructed and returned by
@@ -52,7 +66,11 @@ def __init__(self, connection, hydrant, fetch_size, on_closed,
5266
# there ar more records available to pull from the server
5367
self._has_more = False
5468
# the result has been fully iterated or consumed
55-
self._closed = False
69+
self._exhausted = False
70+
# the result has been consumed
71+
self._consumed = False
72+
# the result has been closed as a result of closing the transaction
73+
self._out_of_scope = False
5674

5775
@property
5876
def _qid(self):
@@ -194,7 +212,11 @@ async def __aiter__(self):
194212
self._pull()
195213
await self._connection.send_all()
196214

197-
self._closed = True
215+
self._exhausted = True
216+
if self._out_of_scope:
217+
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
218+
if self._consumed:
219+
raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR)
198220

199221
async def __anext__(self):
200222
return await self.__aiter__().__anext__()
@@ -203,7 +225,7 @@ async def _attach(self):
203225
"""Sets the Result object in an attached state by fetching messages from
204226
the connection to the buffer.
205227
"""
206-
if self._closed is False:
228+
if self._exhausted is False:
207229
while self._attached is False:
208230
await self._connection.fetch_message()
209231

@@ -215,14 +237,18 @@ async def _buffer(self, n=None):
215237
Might ent up with fewer records in the buffer if there are not enough
216238
records available.
217239
"""
240+
if self._out_of_scope:
241+
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
242+
if self._consumed:
243+
raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR)
218244
if n is not None and len(self._record_buffer) >= n:
219245
return
220246
record_buffer = deque()
221247
async for record in self:
222248
record_buffer.append(record)
223249
if n is not None and len(record_buffer) >= n:
224250
break
225-
self._closed = False
251+
self._exhausted = False
226252
if n is None:
227253
self._record_buffer = record_buffer
228254
else:
@@ -260,6 +286,14 @@ def keys(self):
260286
"""
261287
return self._keys
262288

289+
async def _tx_end(self):
290+
# Handle closure of the associated transaction.
291+
#
292+
# This will consume the result and mark it at out of scope.
293+
# Subsequent calls to `next` will raise a ResultConsumedError.
294+
await self.consume()
295+
self._out_of_scope = True
296+
263297
async def consume(self):
264298
"""Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.
265299
@@ -296,12 +330,14 @@ async def get_two_tx(tx):
296330
297331
:returns: The :class:`neo4j.ResultSummary` for this result
298332
"""
299-
if self._closed is False:
333+
if self._exhausted is False:
300334
self._discarding = True
301335
async for _ in self:
302336
pass
303337

304-
return self._obtain_summary()
338+
summary = self._obtain_summary()
339+
self._consumed = True
340+
return summary
305341

306342
async def single(self):
307343
"""Obtain the next and only remaining record from this result if available else return None.
@@ -311,16 +347,21 @@ async def single(self):
311347
the first of these is still returned.
312348
313349
:returns: the next :class:`neo4j.AsyncRecord`.
314-
:raises: ResultNotSingleError if not exactly one record is available.
350+
351+
:raises ResultNotSingleError: if not exactly one record is available.
352+
:raises ResultConsumedError: if the transaction from which this result was
353+
obtained has been closed.
315354
"""
316355
await self._buffer(2)
317356
if not self._record_buffer:
318357
raise ResultNotSingleError(
358+
self,
319359
"No records found. "
320360
"Make sure your query returns exactly one record."
321361
)
322362
elif len(self._record_buffer) > 1:
323363
raise ResultNotSingleError(
364+
self,
324365
"More than one record found. "
325366
"Make sure your query returns exactly one record."
326367
)
@@ -331,6 +372,10 @@ async def peek(self):
331372
This leaves the record in the buffer for further processing.
332373
333374
:returns: the next :class:`.Record` or :const:`None` if none remain
375+
376+
:raises ResultConsumedError: if the transaction from which this result
377+
was obtained has been closed or the Result has been explicitly
378+
consumed.
334379
"""
335380
await self._buffer(1)
336381
if self._record_buffer:
@@ -343,6 +388,10 @@ async def graph(self):
343388
344389
:returns: a result graph
345390
:rtype: :class:`neo4j.graph.Graph`
391+
392+
:raises ResultConsumedError: if the transaction from which this result
393+
was obtained has been closed or the Result has been explicitly
394+
consumed.
346395
"""
347396
await self._buffer_all()
348397
return self._hydrant.graph
@@ -354,8 +403,13 @@ async def value(self, key=0, default=None):
354403
355404
:param key: field to return for each remaining record. Obtain a single value from the record by index or key.
356405
:param default: default value, used if the index of key is unavailable
406+
357407
:returns: list of individual values
358408
:rtype: list
409+
410+
:raises ResultConsumedError: if the transaction from which this result
411+
was obtained has been closed or the Result has been explicitly
412+
consumed.
359413
"""
360414
return [record.value(key, default) async for record in self]
361415

@@ -365,8 +419,13 @@ async def values(self, *keys):
365419
See :class:`neo4j.AsyncRecord.values`
366420
367421
:param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
422+
368423
:returns: list of values lists
369424
:rtype: list
425+
426+
:raises ResultConsumedError: if the transaction from which this result
427+
was obtained has been closed or the Result has been explicitly
428+
consumed.
370429
"""
371430
return [record.values(*keys) async for record in self]
372431

@@ -376,7 +435,28 @@ async def data(self, *keys):
376435
See :class:`neo4j.AsyncRecord.data`
377436
378437
:param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
438+
379439
:returns: list of dictionaries
380440
:rtype: list
441+
442+
:raises ResultConsumedError: if the transaction from which this result was
443+
obtained has been closed.
381444
"""
382445
return [record.data(*keys) async for record in self]
446+
447+
def closed(self):
448+
"""Return True if the result has been closed.
449+
450+
When a result gets consumed :meth:`consume` or the transaction that
451+
owns the result gets closed (committed, rolled back, closed), the
452+
result cannot be used to acquire further records.
453+
454+
In such case, all methods that need to access the Result's records,
455+
will raise a :exc:`ResultConsumedError` when called.
456+
457+
:returns: whether the result is closed.
458+
:rtype: bool
459+
460+
.. versionadded:: 5.0
461+
"""
462+
return self._out_of_scope or self._consumed

neo4j/_async/work/transaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async def _error_handler(self, exc):
7878

7979
async def _consume_results(self):
8080
for result in self._results:
81-
await result.consume()
81+
await result._tx_end()
8282
self._results = []
8383

8484
async def run(self, query, parameters=None, **kwparameters):

0 commit comments

Comments
 (0)