20
20
21
21
from ..._async_compat .util import AsyncUtil
22
22
from ...data import DataDehydrator
23
- from ...exceptions import ResultNotSingleError
23
+ from ...exceptions import (
24
+ ResultConsumedError ,
25
+ ResultNotSingleError ,
26
+ )
24
27
from ...work import ResultSummary
25
28
from ..io import ConnectionErrorHandler
26
29
27
30
31
+ _RESULT_OUT_OF_SCOPE_ERROR = (
32
+ "The result is out of scope. The associated transaction "
33
+ "has been closed. Results can only be used while the "
34
+ "transaction is open."
35
+ )
36
+ _RESULT_CONSUMED_ERROR = (
37
+ "The result has been consumed. Fetch all needed records before calling "
38
+ "Result.consume()."
39
+ )
40
+
41
+
28
42
class AsyncResult :
29
43
"""A handler for the result of Cypher query execution. Instances
30
44
of this class are typically constructed and returned by
@@ -53,6 +67,10 @@ def __init__(self, connection, hydrant, fetch_size, on_closed,
53
67
self ._has_more = False
54
68
# the result has been fully iterated or consumed
55
69
self ._closed = False
70
+ # the result has been consumed
71
+ self ._consumed = False
72
+ # the result has been closed as a result of closing the transaction
73
+ self ._out_of_scope = False
56
74
57
75
@property
58
76
def _qid (self ):
@@ -195,6 +213,10 @@ async def __aiter__(self):
195
213
await self ._connection .send_all ()
196
214
197
215
self ._closed = True
216
+ if self ._out_of_scope :
217
+ raise ResultConsumedError (self , _RESULT_OUT_OF_SCOPE_ERROR )
218
+ if self ._consumed :
219
+ raise ResultConsumedError (self , _RESULT_CONSUMED_ERROR )
198
220
199
221
async def __anext__ (self ):
200
222
return await self .__aiter__ ().__anext__ ()
@@ -215,6 +237,10 @@ async def _buffer(self, n=None):
215
237
Might ent up with fewer records in the buffer if there are not enough
216
238
records available.
217
239
"""
240
+ if self ._out_of_scope :
241
+ raise ResultConsumedError (self , _RESULT_OUT_OF_SCOPE_ERROR )
242
+ if self ._consumed :
243
+ raise ResultConsumedError (self , _RESULT_CONSUMED_ERROR )
218
244
if n is not None and len (self ._record_buffer ) >= n :
219
245
return
220
246
record_buffer = deque ()
@@ -260,6 +286,14 @@ def keys(self):
260
286
"""
261
287
return self ._keys
262
288
289
+ async def _tx_end (self ):
290
+ # Handle closure of the associated transaction.
291
+ #
292
+ # This will consume the result and mark it at out of scope.
293
+ # Subsequent calls to `next` will raise a ResultConsumedError.
294
+ await self .consume ()
295
+ self ._out_of_scope = True
296
+
263
297
async def consume (self ):
264
298
"""Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.
265
299
@@ -301,7 +335,9 @@ async def get_two_tx(tx):
301
335
async for _ in self :
302
336
pass
303
337
304
- return self ._obtain_summary ()
338
+ summary = self ._obtain_summary ()
339
+ self ._consumed = True
340
+ return summary
305
341
306
342
async def single (self ):
307
343
"""Obtain the next and only remaining record from this result if available else return None.
@@ -311,16 +347,21 @@ async def single(self):
311
347
the first of these is still returned.
312
348
313
349
:returns: the next :class:`neo4j.AsyncRecord`.
314
- :raises: ResultNotSingleError if not exactly one record is available.
350
+
351
+ :raises ResultNotSingleError: if not exactly one record is available.
352
+ :raises ResultConsumedError: if the transaction from which this result was
353
+ obtained has been closed.
315
354
"""
316
355
await self ._buffer (2 )
317
356
if not self ._record_buffer :
318
357
raise ResultNotSingleError (
358
+ self ,
319
359
"No records found. "
320
360
"Make sure your query returns exactly one record."
321
361
)
322
362
elif len (self ._record_buffer ) > 1 :
323
363
raise ResultNotSingleError (
364
+ self ,
324
365
"More than one record found. "
325
366
"Make sure your query returns exactly one record."
326
367
)
@@ -331,6 +372,10 @@ async def peek(self):
331
372
This leaves the record in the buffer for further processing.
332
373
333
374
:returns: the next :class:`.Record` or :const:`None` if none remain
375
+
376
+ :raises ResultConsumedError: if the transaction from which this result
377
+ was obtained has been closed or the Result has been explicitly
378
+ consumed.
334
379
"""
335
380
await self ._buffer (1 )
336
381
if self ._record_buffer :
@@ -343,6 +388,10 @@ async def graph(self):
343
388
344
389
:returns: a result graph
345
390
:rtype: :class:`neo4j.graph.Graph`
391
+
392
+ :raises ResultConsumedError: if the transaction from which this result
393
+ was obtained has been closed or the Result has been explicitly
394
+ consumed.
346
395
"""
347
396
await self ._buffer_all ()
348
397
return self ._hydrant .graph
@@ -354,8 +403,13 @@ async def value(self, key=0, default=None):
354
403
355
404
:param key: field to return for each remaining record. Obtain a single value from the record by index or key.
356
405
:param default: default value, used if the index of key is unavailable
406
+
357
407
:returns: list of individual values
358
408
:rtype: list
409
+
410
+ :raises ResultConsumedError: if the transaction from which this result
411
+ was obtained has been closed or the Result has been explicitly
412
+ consumed.
359
413
"""
360
414
return [record .value (key , default ) async for record in self ]
361
415
@@ -365,8 +419,13 @@ async def values(self, *keys):
365
419
See :class:`neo4j.AsyncRecord.values`
366
420
367
421
:param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
422
+
368
423
:returns: list of values lists
369
424
:rtype: list
425
+
426
+ :raises ResultConsumedError: if the transaction from which this result
427
+ was obtained has been closed or the Result has been explicitly
428
+ consumed.
370
429
"""
371
430
return [record .values (* keys ) async for record in self ]
372
431
@@ -376,7 +435,26 @@ async def data(self, *keys):
376
435
See :class:`neo4j.AsyncRecord.data`
377
436
378
437
:param keys: fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
438
+
379
439
:returns: list of dictionaries
380
440
:rtype: list
441
+
442
+ :raises ResultConsumedError: if the transaction from which this result was
443
+ obtained has been closed.
381
444
"""
382
445
return [record .data (* keys ) async for record in self ]
446
+
447
+ def closed (self ):
448
+ """Return True if the result is still valid (not closed).
449
+
450
+ When a result gets consumed :meth:`consume` or the transaction that
451
+ owns the result gets closed (committed, rolled back, closed), the
452
+ result cannot be used to acquire further records.
453
+
454
+ In such case, all methods that need to access the Result's records,
455
+ will raise a :exc:`ResultConsumedError` when called.
456
+
457
+ :returns: whether the result is closed.
458
+ :rtype: bool
459
+ """
460
+ return self ._out_of_scope or self ._consumed
0 commit comments