-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathcursor.py
344 lines (278 loc) · 10.9 KB
/
cursor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
__all__ = ["Cursor"]
from collections import deque
from typing import Any, Deque, Optional, Sequence
from arango.connection import BaseConnection
from arango.exceptions import (
CursorCloseError,
CursorCountError,
CursorEmptyError,
CursorNextError,
CursorStateError,
)
from arango.request import Request
from arango.typings import Json
class Cursor:
"""Cursor API wrapper.
Cursors fetch query results from ArangoDB server in batches. Cursor objects
are *stateful* as they store the fetched items in-memory. They must not be
shared across threads without proper locking mechanism.
:param connection: HTTP connection.
:param init_data: Cursor initialization data.
:type init_data: dict
:param cursor_type: Cursor type ("cursor" or "export").
:type cursor_type: str
:param allow_retry: If set to True, the cursor will always attempt to fetch
the latest batch from server even if the previous attempt failed.
This option is only available for server versions 3.11 and above.
:type allow_retry: bool
"""
__slots__ = [
"_conn",
"_type",
"_id",
"_count",
"_cached",
"_stats",
"_plan",
"_profile",
"_warnings",
"_has_more",
"_batch",
"_next_batch_id",
"_allow_retry",
]
def __init__(
self,
connection: BaseConnection,
init_data: Json,
cursor_type: str = "cursor",
allow_retry: bool = False,
) -> None:
self._conn = connection
self._type = cursor_type
self._allow_retry = allow_retry
self._batch: Deque[Any] = deque()
self._id = None
self._count: Optional[int] = None
self._cached = None
self._stats = None
self._plan = None
self._profile = None
self._warnings = None
self._next_batch_id: Optional[str] = None
self._update(init_data)
def __iter__(self) -> "Cursor":
return self
def __next__(self) -> Any: # pragma: no cover
return self.next()
def __enter__(self) -> "Cursor":
return self
def __len__(self) -> int:
if self._count is None:
raise CursorCountError("cursor count not enabled")
return self._count
def __exit__(self, *_: Any) -> None:
self.close(ignore_missing=True)
def __repr__(self) -> str:
return f"<Cursor {self._id}>" if self._id else "<Cursor>"
def _update(self, data: Json) -> Json:
"""Update the cursor using data from ArangoDB server.
:param data: Cursor data from ArangoDB server (e.g. results).
:type data: dict
:return: Update cursor data.
:rtype: dict
"""
result: Json = {}
if "id" in data:
self._id = data["id"]
result["id"] = data["id"]
if "count" in data:
self._count = data["count"]
result["count"] = data["count"]
if "cached" in data:
self._cached = data["cached"]
result["cached"] = data["cached"]
# New in 3.11
if "nextBatchId" in data:
# This is only available for server versions 3.11 and above.
# Currently, we are testing against 3.10.9
self._next_batch_id = data["nextBatchId"] # pragma: no cover
result["next_batch_id"] = data["nextBatchId"] # pragma: no cover
self._has_more = bool(data["hasMore"])
result["has_more"] = data["hasMore"]
self._batch.extend(data["result"])
result["batch"] = data["result"]
if "extra" in data:
extra = data["extra"]
if "profile" in extra:
self._profile = extra["profile"]
result["profile"] = extra["profile"]
if "warnings" in extra:
self._warnings = extra["warnings"]
result["warnings"] = extra["warnings"]
if "plan" in extra:
self._plan = extra["plan"]
result["plan"] = extra["plan"]
if "stats" in extra:
stats = extra["stats"]
if "writesExecuted" in stats:
stats["modified"] = stats.pop("writesExecuted")
if "writesIgnored" in stats:
stats["ignored"] = stats.pop("writesIgnored")
if "documentLookups" in stats:
stats["lookups"] = stats.pop("documentLookups")
if "scannedFull" in stats:
stats["scanned_full"] = stats.pop("scannedFull")
if "scannedIndex" in stats:
stats["scanned_index"] = stats.pop("scannedIndex")
if "executionTime" in stats:
stats["execution_time"] = stats.pop("executionTime")
if "httpRequests" in stats:
stats["http_requests"] = stats.pop("httpRequests")
if "cursorsCreated" in stats:
stats["cursorsCreated"] = stats.pop("cursorsCreated")
if "cursorsRearmed" in stats:
stats["cursorsRearmed"] = stats.pop("cursorsRearmed")
if "cacheHits" in stats:
stats["cacheHits"] = stats.pop("cacheHits")
if "cacheMisses" in stats:
stats["cacheMisses"] = stats.pop("cacheMisses")
# New in 3.11
if "peakMemoryUsage" in stats:
stats["peak_memory_usage"] = stats.pop("peakMemoryUsage")
if "intermediateCommits" in stats:
stats["intermediate_commits"] = stats.pop("intermediateCommits")
self._stats = stats
result["statistics"] = stats
return result
@property
def id(self) -> Optional[str]:
"""Return the cursor ID.
:return: Cursor ID.
:rtype: str
"""
return self._id
@property
def type(self) -> str:
"""Return the cursor type.
:return: Cursor type ("cursor" or "export").
:rtype: str
"""
return self._type
def batch(self) -> Optional[Deque[Any]]:
"""Return the current batch of results.
:return: Current batch.
:rtype: collections.deque
"""
return self._batch
def has_more(self) -> Optional[bool]:
"""Return True if more results are available on the server.
:return: True if more results are available on the server.
:rtype: bool
"""
return self._has_more
def count(self) -> Optional[int]:
"""Return the total number of documents in the entire result set.
:return: Total number of documents, or None if the count option
was not enabled during cursor initialization.
:rtype: int | None
"""
return self._count
def cached(self) -> Optional[bool]:
"""Return True if results are cached.
:return: True if results are cached.
:rtype: bool
"""
return self._cached
def statistics(self) -> Optional[Json]:
"""Return cursor statistics.
:return: Cursor statistics.
:rtype: dict
"""
return self._stats
def profile(self) -> Optional[Json]:
"""Return cursor performance profile.
:return: Cursor performance profile.
:rtype: dict
"""
return self._profile
def warnings(self) -> Optional[Sequence[Json]]:
"""Return any warnings from the query execution.
:return: Warnings, or None if there are none.
:rtype: [str]
"""
return self._warnings
def plan(self) -> Optional[Json]:
"""Return query execution plan.
:return: Query execution plan.
:rtype: dict
"""
return self._plan
def empty(self) -> bool:
"""Check if the current batch is empty.
:return: True if current batch is empty, False otherwise.
:rtype: bool
"""
return len(self._batch) == 0
def next(self) -> Any:
"""Pop the next item from the current batch.
If current batch is empty/depleted, an API request is automatically
sent to ArangoDB server to fetch the next batch and update the cursor.
:return: Next item in current batch.
:raise StopIteration: If the result set is depleted.
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
"""
if self.empty():
if not self.has_more():
raise StopIteration
self.fetch()
return self.pop()
def pop(self) -> Any:
"""Pop the next item from current batch.
If current batch is empty/depleted, an exception is raised. You must
call :func:`arango.cursor.Cursor.fetch` to manually fetch the next
batch from server.
:return: Next item in current batch.
:raise arango.exceptions.CursorEmptyError: If current batch is empty.
"""
if len(self._batch) == 0:
raise CursorEmptyError("current batch is empty")
return self._batch.popleft()
def fetch(self) -> Json:
"""Fetch the next batch from server and update the cursor.
:return: New batch details.
:rtype: dict
:raise arango.exceptions.CursorNextError: If batch retrieval fails.
:raise arango.exceptions.CursorStateError: If cursor ID is not set.
"""
if self._id is None:
raise CursorStateError("cursor ID not set")
endpoint = f"/_api/{self._type}/{self._id}"
if self._allow_retry and self._next_batch_id is not None:
endpoint += f"/{self._next_batch_id}" # pragma: no cover
request = Request(method="post", endpoint=endpoint)
resp = self._conn.send_request(request)
if not resp.is_success:
raise CursorNextError(resp, request)
return self._update(resp.body)
def close(self, ignore_missing: bool = False) -> Optional[bool]:
"""Close the cursor and free any server resources tied to it.
:param ignore_missing: Do not raise exception on missing cursors.
:type ignore_missing: bool
:return: True if cursor was closed successfully, False if cursor was
missing on the server and **ignore_missing** was set to True, None
if there are no cursors to close server-side (e.g. result set is
smaller than the batch size).
:rtype: bool | None
:raise arango.exceptions.CursorCloseError: If operation fails.
"""
if self._id is None:
return None
request = Request(method="delete", endpoint=f"/_api/{self._type}/{self._id}")
resp = self._conn.send_request(request)
if resp.is_success:
return True
if resp.status_code == 404 and ignore_missing:
return False
raise CursorCloseError(resp, request)