|
1 | 1 | __all__ = [
|
2 | 2 | "Database",
|
3 | 3 | "StandardDatabase",
|
| 4 | + "TransactionDatabase", |
4 | 5 | ]
|
5 | 6 |
|
6 | 7 |
|
|
24 | 25 | PermissionResetError,
|
25 | 26 | PermissionUpdateError,
|
26 | 27 | ServerStatusError,
|
| 28 | + TransactionAbortError, |
| 29 | + TransactionCommitError, |
| 30 | + TransactionInitError, |
| 31 | + TransactionListError, |
| 32 | + TransactionStatusError, |
27 | 33 | UserCreateError,
|
28 | 34 | UserDeleteError,
|
29 | 35 | UserGetError,
|
30 | 36 | UserListError,
|
31 | 37 | UserReplaceError,
|
32 | 38 | UserUpdateError,
|
33 | 39 | )
|
34 |
| -from arangoasync.executor import ApiExecutor, DefaultApiExecutor |
| 40 | +from arangoasync.executor import ApiExecutor, DefaultApiExecutor, TransactionApiExecutor |
35 | 41 | from arangoasync.request import Method, Request
|
36 | 42 | from arangoasync.response import Response
|
37 | 43 | from arangoasync.serialization import Deserializer, Serializer
|
@@ -84,6 +90,16 @@ def deserializer(self) -> Deserializer[Json, Jsons]:
|
84 | 90 | """Return the deserializer."""
|
85 | 91 | return self._executor.deserializer
|
86 | 92 |
|
| 93 | + @property |
| 94 | + def context(self) -> str: |
| 95 | + """Return the API execution context. |
| 96 | +
|
| 97 | + Returns: |
| 98 | + str: API execution context. Possible values are "default", "transaction". |
| 99 | + :rtype: str |
| 100 | + """ |
| 101 | + return self._executor.context |
| 102 | + |
87 | 103 | async def properties(self) -> Result[DatabaseProperties]:
|
88 | 104 | """Return database properties.
|
89 | 105 |
|
@@ -1065,7 +1081,209 @@ def response_handler(resp: Response) -> Json:
|
1065 | 1081 |
|
1066 | 1082 |
|
1067 | 1083 | class StandardDatabase(Database):
|
1068 |
| - """Standard database API wrapper.""" |
| 1084 | + """Standard database API wrapper. |
| 1085 | +
|
| 1086 | + Args: |
| 1087 | + connection (Connection): Connection object to be used by the API executor. |
| 1088 | + """ |
1069 | 1089 |
|
1070 | 1090 | def __init__(self, connection: Connection) -> None:
|
1071 | 1091 | super().__init__(DefaultApiExecutor(connection))
|
| 1092 | + |
| 1093 | + def __repr__(self) -> str: |
| 1094 | + return f"<StandardDatabase {self.name}>" |
| 1095 | + |
| 1096 | + async def begin_transaction( |
| 1097 | + self, |
| 1098 | + read: Optional[str | Sequence[str]] = None, |
| 1099 | + write: Optional[str | Sequence[str]] = None, |
| 1100 | + exclusive: Optional[str | Sequence[str]] = None, |
| 1101 | + wait_for_sync: Optional[bool] = None, |
| 1102 | + allow_implicit: Optional[bool] = None, |
| 1103 | + lock_timeout: Optional[int] = None, |
| 1104 | + max_transaction_size: Optional[int] = None, |
| 1105 | + allow_dirty_read: Optional[bool] = None, |
| 1106 | + skip_fast_lock_round: Optional[bool] = None, |
| 1107 | + ) -> "TransactionDatabase": |
| 1108 | + """Begin a Stream Transaction. |
| 1109 | +
|
| 1110 | + Args: |
| 1111 | + read (str | list | None): Name(s) of collections read during transaction. |
| 1112 | + Read-only collections are added lazily but should be declared if |
| 1113 | + possible to avoid deadlocks. |
| 1114 | + write (str | list | None): Name(s) of collections written to during |
| 1115 | + transaction with shared access. |
| 1116 | + exclusive (str | list | None): Name(s) of collections written to during |
| 1117 | + transaction with exclusive access. |
| 1118 | + wait_for_sync (bool | None): If `True`, will force the transaction to write |
| 1119 | + all data to disk before returning |
| 1120 | + allow_implicit (bool | None): Allow reading from undeclared collections. |
| 1121 | + lock_timeout (int | None): Timeout for waiting on collection locks. Setting |
| 1122 | + it to 0 will make ArangoDB not time out waiting for a lock. |
| 1123 | + max_transaction_size (int | None): Transaction size limit in bytes. |
| 1124 | + allow_dirty_read (bool | None): If `True`, allows the Coordinator to ask any |
| 1125 | + shard replica for the data, not only the shard leader. This may result |
| 1126 | + in “dirty reads”. This setting decides about dirty reads for the entire |
| 1127 | + transaction. Individual read operations, that are performed as part of |
| 1128 | + the transaction, cannot override it. |
| 1129 | + skip_fast_lock_round (bool | None): Whether to disable fast locking for |
| 1130 | + write operations. |
| 1131 | +
|
| 1132 | + Returns: |
| 1133 | + TransactionDatabase: Database API wrapper specifically tailored for |
| 1134 | + transactions. |
| 1135 | +
|
| 1136 | + Raises: |
| 1137 | + TransactionInitError: If the operation fails on the server side. |
| 1138 | + """ |
| 1139 | + collections = dict() |
| 1140 | + if read is not None: |
| 1141 | + collections["read"] = read |
| 1142 | + if write is not None: |
| 1143 | + collections["write"] = write |
| 1144 | + if exclusive is not None: |
| 1145 | + collections["exclusive"] = exclusive |
| 1146 | + |
| 1147 | + data: Json = dict(collections=collections) |
| 1148 | + if wait_for_sync is not None: |
| 1149 | + data["waitForSync"] = wait_for_sync |
| 1150 | + if allow_implicit is not None: |
| 1151 | + data["allowImplicit"] = allow_implicit |
| 1152 | + if lock_timeout is not None: |
| 1153 | + data["lockTimeout"] = lock_timeout |
| 1154 | + if max_transaction_size is not None: |
| 1155 | + data["maxTransactionSize"] = max_transaction_size |
| 1156 | + if skip_fast_lock_round is not None: |
| 1157 | + data["skipFastLockRound"] = skip_fast_lock_round |
| 1158 | + |
| 1159 | + headers = dict() |
| 1160 | + if allow_dirty_read is not None: |
| 1161 | + headers["x-arango-allow-dirty-read"] = str(allow_dirty_read).lower() |
| 1162 | + |
| 1163 | + request = Request( |
| 1164 | + method=Method.POST, |
| 1165 | + endpoint="/_api/transaction/begin", |
| 1166 | + data=self.serializer.dumps(data), |
| 1167 | + headers=headers, |
| 1168 | + ) |
| 1169 | + |
| 1170 | + def response_handler(resp: Response) -> str: |
| 1171 | + if not resp.is_success: |
| 1172 | + raise TransactionInitError(resp, request) |
| 1173 | + result: Json = self.deserializer.loads(resp.raw_body)["result"] |
| 1174 | + return cast(str, result["id"]) |
| 1175 | + |
| 1176 | + transaction_id = await self._executor.execute(request, response_handler) |
| 1177 | + return TransactionDatabase(self.connection, transaction_id) |
| 1178 | + |
| 1179 | + def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase": |
| 1180 | + """Fetch an existing transaction. |
| 1181 | +
|
| 1182 | + Args: |
| 1183 | + transaction_id (str): Transaction ID. |
| 1184 | +
|
| 1185 | + Returns: |
| 1186 | + TransactionDatabase: Database API wrapper specifically tailored for |
| 1187 | + transactions. |
| 1188 | + """ |
| 1189 | + return TransactionDatabase(self.connection, transaction_id) |
| 1190 | + |
| 1191 | + async def list_transactions(self) -> Result[Jsons]: |
| 1192 | + """List all currently running stream transactions. |
| 1193 | +
|
| 1194 | + Returns: |
| 1195 | + list: List of transactions, with each transaction containing |
| 1196 | + an "id" and a "state" field. |
| 1197 | +
|
| 1198 | + Raises: |
| 1199 | + TransactionListError: If the operation fails on the server side. |
| 1200 | + """ |
| 1201 | + request = Request(method=Method.GET, endpoint="/_api/transaction") |
| 1202 | + |
| 1203 | + def response_handler(resp: Response) -> Jsons: |
| 1204 | + if not resp.is_success: |
| 1205 | + raise TransactionListError(resp, request) |
| 1206 | + result: Json = self.deserializer.loads(resp.raw_body) |
| 1207 | + return cast(Jsons, result["transactions"]) |
| 1208 | + |
| 1209 | + return await self._executor.execute(request, response_handler) |
| 1210 | + |
| 1211 | + |
| 1212 | +class TransactionDatabase(Database): |
| 1213 | + """Database API tailored specifically for |
| 1214 | + `Stream Transactions <https://docs.arangodb.com/stable/develop/http-api/transactions/stream-transactions/>`__. |
| 1215 | +
|
| 1216 | + It allows you start a transaction, run multiple operations (eg. AQL queries) over a short period of time, |
| 1217 | + and then commit or abort the transaction. |
| 1218 | +
|
| 1219 | + See :func:`arangoasync.database.StandardDatabase.begin_transaction`. |
| 1220 | +
|
| 1221 | + Args: |
| 1222 | + connection (Connection): Connection object to be used by the API executor. |
| 1223 | + transaction_id (str): Transaction ID. |
| 1224 | + """ # noqa: E501 |
| 1225 | + |
| 1226 | + def __init__(self, connection: Connection, transaction_id: str) -> None: |
| 1227 | + super().__init__(TransactionApiExecutor(connection, transaction_id)) |
| 1228 | + self._standard_executor = DefaultApiExecutor(connection) |
| 1229 | + self._transaction_id = transaction_id |
| 1230 | + |
| 1231 | + def __repr__(self) -> str: |
| 1232 | + return f"<TransactionDatabase {self.name}>" |
| 1233 | + |
| 1234 | + @property |
| 1235 | + def transaction_id(self) -> str: |
| 1236 | + """Transaction ID.""" |
| 1237 | + return self._transaction_id |
| 1238 | + |
| 1239 | + async def transaction_status(self) -> str: |
| 1240 | + """Get the status of the transaction. |
| 1241 | +
|
| 1242 | + Returns: |
| 1243 | + str: Transaction status: one of "running", "committed" or "aborted". |
| 1244 | +
|
| 1245 | + Raises: |
| 1246 | + TransactionStatusError: If the transaction is not found. |
| 1247 | + """ |
| 1248 | + request = Request( |
| 1249 | + method=Method.GET, |
| 1250 | + endpoint=f"/_api/transaction/{self.transaction_id}", |
| 1251 | + ) |
| 1252 | + |
| 1253 | + def response_handler(resp: Response) -> str: |
| 1254 | + if not resp.is_success: |
| 1255 | + raise TransactionStatusError(resp, request) |
| 1256 | + result: Json = self.deserializer.loads(resp.raw_body)["result"] |
| 1257 | + return cast(str, result["status"]) |
| 1258 | + |
| 1259 | + return await self._executor.execute(request, response_handler) |
| 1260 | + |
| 1261 | + async def commit_transaction(self) -> None: |
| 1262 | + """Commit the transaction. |
| 1263 | +
|
| 1264 | + Raises: |
| 1265 | + TransactionCommitError: If the operation fails on the server side. |
| 1266 | + """ |
| 1267 | + request = Request( |
| 1268 | + method=Method.PUT, |
| 1269 | + endpoint=f"/_api/transaction/{self.transaction_id}", |
| 1270 | + ) |
| 1271 | + |
| 1272 | + def response_handler(resp: Response) -> None: |
| 1273 | + if not resp.is_success: |
| 1274 | + raise TransactionCommitError(resp, request) |
| 1275 | + |
| 1276 | + await self._executor.execute(request, response_handler) |
| 1277 | + |
| 1278 | + async def abort_transaction(self) -> None: |
| 1279 | + """Abort the transaction.""" |
| 1280 | + request = Request( |
| 1281 | + method=Method.DELETE, |
| 1282 | + endpoint=f"/_api/transaction/{self.transaction_id}", |
| 1283 | + ) |
| 1284 | + |
| 1285 | + def response_handler(resp: Response) -> None: |
| 1286 | + if not resp.is_success: |
| 1287 | + raise TransactionAbortError(resp, request) |
| 1288 | + |
| 1289 | + await self._executor.execute(request, response_handler) |
0 commit comments