-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcollection.py
360 lines (303 loc) · 12.7 KB
/
collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
__all__ = ["Collection", "StandardCollection"]
from typing import Generic, Optional, Tuple, TypeVar, cast
from arangoasync.errno import (
HTTP_BAD_PARAMETER,
HTTP_NOT_FOUND,
HTTP_PRECONDITION_FAILED,
)
from arangoasync.exceptions import (
CollectionPropertiesError,
DocumentGetError,
DocumentInsertError,
DocumentParseError,
DocumentRevisionError,
)
from arangoasync.executor import ApiExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.serialization import Deserializer, Serializer
from arangoasync.typings import CollectionProperties, Json, Params, Result
T = TypeVar("T")
U = TypeVar("U")
V = TypeVar("V")
class Collection(Generic[T, U, V]):
"""Base class for collection API wrappers.
Args:
executor (ApiExecutor): API executor.
name (str): Collection name
doc_serializer (Serializer): Document serializer.
doc_deserializer (Deserializer): Document deserializer.
"""
def __init__(
self,
executor: ApiExecutor,
name: str,
doc_serializer: Serializer[T],
doc_deserializer: Deserializer[U, V],
) -> None:
self._executor = executor
self._name = name
self._doc_serializer = doc_serializer
self._doc_deserializer = doc_deserializer
self._id_prefix = f"{self._name}/"
def _validate_id(self, doc_id: str) -> str:
"""Check the collection name in the document ID.
Args:
doc_id (str): Document ID.
Returns:
str: Verified document ID.
Raises:
DocumentParseError: On bad collection name.
"""
if not doc_id.startswith(self._id_prefix):
raise DocumentParseError(f'Bad collection name in document ID "{doc_id}"')
return doc_id
def _extract_id(self, body: Json) -> str:
"""Extract the document ID from document body.
Args:
body (dict): Document body.
Returns:
str: Document ID.
Raises:
DocumentParseError: On missing ID and key.
"""
try:
if "_id" in body:
return self._validate_id(body["_id"])
else:
key: str = body["_key"]
return self._id_prefix + key
except KeyError:
raise DocumentParseError('Field "_key" or "_id" required')
def _ensure_key_from_id(self, body: Json) -> Json:
"""Return the body with "_key" field if it has "_id" field.
Args:
body (dict): Document body.
Returns:
dict: Document body with "_key" field if it has "_id" field.
"""
if "_id" in body and "_key" not in body:
doc_id = self._validate_id(body["_id"])
body = body.copy()
body["_key"] = doc_id[len(self._id_prefix) :]
return body
def _prep_from_doc(
self,
document: str | Json,
rev: Optional[str] = None,
check_rev: bool = False,
) -> Tuple[str, Json]:
"""Prepare document ID, body and request headers before a query.
Args:
document (str | dict): Document ID, key or body.
rev (str | None): Document revision.
check_rev (bool): Whether to check the revision.
Returns:
Document ID and request headers.
Raises:
DocumentParseError: On missing ID and key.
TypeError: On bad document type.
"""
if isinstance(document, dict):
doc_id = self._extract_id(document)
rev = rev or document.get("_rev")
elif isinstance(document, str):
if "/" in document:
doc_id = self._validate_id(document)
else:
doc_id = self._id_prefix + document
else:
raise TypeError("Document must be str or a dict")
if not check_rev or rev is None:
return doc_id, {}
else:
return doc_id, {"If-Match": rev}
@property
def name(self) -> str:
"""Return the name of the collection.
Returns:
str: Collection name.
"""
return self._name
@property
def db_name(self) -> str:
"""Return the name of the current database.
Returns:
str: Database name.
"""
return self._executor.db_name
class StandardCollection(Collection[T, U, V]):
"""Standard collection API wrapper.
Args:
executor (ApiExecutor): API executor.
name (str): Collection name
doc_serializer (Serializer): Document serializer.
doc_deserializer (Deserializer): Document deserializer.
"""
def __init__(
self,
executor: ApiExecutor,
name: str,
doc_serializer: Serializer[T],
doc_deserializer: Deserializer[U, V],
) -> None:
super().__init__(executor, name, doc_serializer, doc_deserializer)
def __repr__(self) -> str:
return f"<StandardCollection {self.name}>"
async def properties(self) -> Result[CollectionProperties]:
"""Return the full properties of the current collection.
Returns:
CollectionProperties: Properties.
Raises:
CollectionPropertiesError: If retrieval fails.
References:
- `get-the-properties-of-a-collection <https://docs.arangodb.com/stable/develop/http-api/collections/#get-the-properties-of-a-collection>`__
""" # noqa: E501
request = Request(
method=Method.GET,
endpoint=f"/_api/collection/{self.name}/properties",
)
def response_handler(resp: Response) -> CollectionProperties:
if not resp.is_success:
raise CollectionPropertiesError(resp, request)
return CollectionProperties(self._executor.deserialize(resp.raw_body))
return await self._executor.execute(request, response_handler)
async def get(
self,
document: str | Json,
rev: Optional[str] = None,
check_rev: bool = True,
allow_dirty_read: bool = False,
) -> Result[Optional[U]]:
"""Return a document.
Args:
document (str | dict): Document ID, key or body.
Document body must contain the "_id" or "_key" field.
rev (str | None): Expected document revision. Overrides the
value of "_rev" field in **document** if present.
check_rev (bool): If set to True, revision of **document** (if given)
is compared against the revision of target document.
allow_dirty_read (bool): Allow reads from followers in a cluster.
Returns:
Document or None if not found.
Raises:
DocumentRevisionError: If the revision is incorrect.
DocumentGetError: If retrieval fails.
References:
- `get-a-document <https://docs.arangodb.com/stable/develop/http-api/documents/#get-a-document>`__
""" # noqa: E501
handle, headers = self._prep_from_doc(document, rev, check_rev)
if allow_dirty_read:
headers["x-arango-allow-dirty-read"] = "true"
request = Request(
method=Method.GET,
endpoint=f"/_api/document/{handle}",
headers=headers,
)
def response_handler(resp: Response) -> Optional[U]:
if resp.is_success:
return self._doc_deserializer.loads(resp.raw_body)
elif resp.status_code == HTTP_NOT_FOUND:
return None
elif resp.status_code == HTTP_PRECONDITION_FAILED:
raise DocumentRevisionError(resp, request)
else:
raise DocumentGetError(resp, request)
return await self._executor.execute(request, response_handler)
async def insert(
self,
document: T,
wait_for_sync: Optional[bool] = None,
return_new: Optional[bool] = None,
return_old: Optional[bool] = None,
silent: Optional[bool] = None,
overwrite: Optional[bool] = None,
overwrite_mode: Optional[str] = None,
keep_null: Optional[bool] = None,
merge_objects: Optional[bool] = None,
refill_index_caches: Optional[bool] = None,
version_attribute: Optional[str] = None,
) -> Result[bool | Json]:
"""Insert a new document.
Args:
document (dict): Document to insert. If it contains the "_key" or "_id"
field, the value is used as the key of the new document (otherwise
it is auto-generated). Any "_rev" field is ignored.
wait_for_sync (bool | None): Wait until document has been synced to disk.
return_new (bool | None): Additionally return the complete new document
under the attribute `new` in the result.
return_old (bool | None): Additionally return the complete old document
under the attribute `old` in the result. Only available if the
`overwrite` option is used.
silent (bool | None): If set to `True`, no document metadata is returned.
This can be used to save resources.
overwrite (bool | None): If set to `True`, operation does not fail on
duplicate key and existing document is overwritten (replace-insert).
overwrite_mode (str | None): Overwrite mode. Supersedes **overwrite**
option. May be one of "ignore", "replace", "update" or "conflict".
keep_null (bool | None): If set to `True`, fields with value None are
retained in the document. Otherwise, they are removed completely.
Applies only when **overwrite_mode** is set to "update"
(update-insert).
merge_objects (bool | None): If set to True, sub-dictionaries are merged
instead of the new one overwriting the old one. Applies only when
**overwrite_mode** is set to "update" (update-insert).
refill_index_caches (bool | None): Whether to add new entries to
in-memory index caches if document insertions affect the edge index
or cache-enabled persistent indexes.
version_attribute (str | None): Support for simple external versioning to
document operations. Only applicable if **overwrite** is set to true
or **overwriteMode** is set to "update" or "replace".
Returns:
bool | dict: Document metadata (e.g. document id, key, revision) or `True`
if **silent** is set to `True`.
Raises:
DocumentInsertError: If insertion fails.
References:
- `create-a-document <https://docs.arangodb.com/stable/develop/http-api/documents/#create-a-document>`__
""" # noqa: E501
if isinstance(document, dict):
# We assume that the document deserializer works with dictionaries.
document = cast(T, self._ensure_key_from_id(document))
params: Params = {}
if wait_for_sync is not None:
params["waitForSync"] = wait_for_sync
if return_new is not None:
params["returnNew"] = return_new
if return_old is not None:
params["returnOld"] = return_old
if silent is not None:
params["silent"] = silent
if overwrite is not None:
params["overwrite"] = overwrite
if overwrite_mode is not None:
params["overwriteMode"] = overwrite_mode
if keep_null is not None:
params["keepNull"] = keep_null
if merge_objects is not None:
params["mergeObjects"] = merge_objects
if refill_index_caches is not None:
params["refillIndexCaches"] = refill_index_caches
if version_attribute is not None:
params["versionAttribute"] = version_attribute
request = Request(
method=Method.POST,
endpoint=f"/_api/document/{self._name}",
params=params,
data=self._doc_serializer.dumps(document),
)
def response_handler(resp: Response) -> bool | Json:
if resp.is_success:
if silent is True:
return True
return self._executor.deserialize(resp.raw_body)
msg: Optional[str] = None
if resp.status_code == HTTP_BAD_PARAMETER:
msg = (
"Body does not contain a valid JSON representation of "
"one document."
)
elif resp.status_code == HTTP_NOT_FOUND:
msg = "Collection not found."
raise DocumentInsertError(resp, request, msg)
return await self._executor.execute(request, response_handler)