Skip to content

Commit 88cbd27

Browse files
committed
Introduce (Async)ManagedTransaction
Transaction functions (a.k.a. managed transactions): The first argument of transaction functions is now a `(Async)ManagedTransaction` object. It behaves exactly like a regular `(Async)Transaction` object, except it does not offer the `commit`, `rollback`, `close`, and `closed` methods. Those methods would have caused a hard to interpreted error previously. Hence, they have been removed.
1 parent 06f0767 commit 88cbd27

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+891
-682
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@
8383
They are now ignored and will be removed in a future release.
8484
- The undocumented return value has been removed. If you need information
8585
about the remote server, use `driver.get_server_info()` instead.
86+
- Transaction functions (a.k.a. managed transactions):
87+
The first argument of transaction functions is now a `ManagedTransaction`
88+
object. It behaves exactly like a regular `Transaction` object, except it
89+
does not offer the `commit`, `rollback`, `close`, and `closed` methods.
90+
Those methods would have caused a hard to interpreted error previously. Hence,
91+
they have been removed.
8692

8793

8894
## Version 4.4

bin/make-unasync

+1
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ def apply_unasync(files):
214214
"_async": "_sync",
215215
"mark_async_test": "mark_sync_test",
216216
"assert_awaited_once": "assert_called_once",
217+
"assert_awaited_once_with": "assert_called_once_with",
217218
}
218219
additional_testkit_backend_replacements = {}
219220
rules = [

docs/source/api.rst

+12-5
Original file line numberDiff line numberDiff line change
@@ -422,16 +422,18 @@ Will result in:
422422
***********************
423423
Sessions & Transactions
424424
***********************
425-
All database activity is co-ordinated through two mechanisms: the :class:`neo4j.Session` and the :class:`neo4j.Transaction`.
425+
All database activity is co-ordinated through two mechanisms:
426+
**sessions** (:class:`neo4j.AsyncSession`) and **transactions**
427+
(:class:`neo4j.Transaction`, :class:`neo4j.ManagedTransaction`).
426428

427-
A :class:`neo4j.Session` is a logical container for any number of causally-related transactional units of work.
429+
A **session** is a logical container for any number of causally-related transactional units of work.
428430
Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required.
429431
Sessions provide the top level of containment for database activity.
430432
Session creation is a lightweight operation and *sessions are not thread safe*.
431433

432434
Connections are drawn from the :class:`neo4j.Driver` connection pool as required.
433435

434-
A :class:`neo4j.Transaction` is a unit of work that is either committed in its entirety or is rolled back on failure.
436+
A **transaction** is a unit of work that is either committed in its entirety or is rolled back on failure.
435437

436438

437439
.. _session-construction-ref:
@@ -724,7 +726,6 @@ Example:
724726
node_id = create_person_node(tx)
725727
set_person_name(tx, node_id, name)
726728
tx.commit()
727-
tx.close()
728729
729730
def create_person_node(tx):
730731
query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
@@ -753,6 +754,12 @@ This function is called one or more times, within a configurable time limit, unt
753754
Results should be fully consumed within the function and only aggregate or status values should be returned.
754755
Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.
755756

757+
This function will receive a :class:`neo4j.ManagedTransaction` object as its first parameter.
758+
759+
.. autoclass:: neo4j.ManagedTransaction
760+
761+
.. automethod:: run
762+
756763
Example:
757764

758765
.. code-block:: python
@@ -811,7 +818,7 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n
811818

812819
.. automethod:: closed
813820

814-
See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
821+
See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.
815822

816823

817824
Graph

docs/source/async_api.rst

+13-6
Original file line numberDiff line numberDiff line change
@@ -235,16 +235,18 @@ Will result in:
235235
*********************************
236236
AsyncSessions & AsyncTransactions
237237
*********************************
238-
All database activity is co-ordinated through two mechanisms: the :class:`neo4j.AsyncSession` and the :class:`neo4j.AsyncTransaction`.
238+
All database activity is co-ordinated through two mechanisms:
239+
**sessions** (:class:`neo4j.AsyncSession`) and **transactions**
240+
(:class:`neo4j.AsyncTransaction`, :class:`neo4j.AsyncManagedTransaction`).
239241

240-
A :class:`neo4j.AsyncSession` is a logical container for any number of causally-related transactional units of work.
242+
A **session** is a logical container for any number of causally-related transactional units of work.
241243
Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required.
242244
Sessions provide the top level of containment for database activity.
243-
Session creation is a lightweight operation and *sessions cannot be shared between coroutines*.
245+
Session creation is a lightweight operation and *sessions are not thread safe*.
244246

245247
Connections are drawn from the :class:`neo4j.AsyncDriver` connection pool as required.
246248

247-
A :class:`neo4j.AsyncTransaction` is a unit of work that is either committed in its entirety or is rolled back on failure.
249+
A **transaction** is a unit of work that is either committed in its entirety or is rolled back on failure.
248250

249251

250252
.. _async-session-construction-ref:
@@ -417,7 +419,6 @@ Example:
417419
node_id = await create_person_node(tx)
418420
await set_person_name(tx, node_id, name)
419421
await tx.commit()
420-
await tx.close()
421422
422423
async def create_person_node(tx):
423424
query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
@@ -447,6 +448,12 @@ This function is called one or more times, within a configurable time limit, unt
447448
Results should be fully consumed within the function and only aggregate or status values should be returned.
448449
Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.
449450

451+
This function will receive a :class:`neo4j.AsyncManagedTransaction` object as its first parameter.
452+
453+
.. autoclass:: neo4j.AsyncManagedTransaction
454+
455+
.. automethod:: run
456+
450457
Example:
451458

452459
.. code-block:: python
@@ -505,4 +512,4 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla
505512

506513
.. automethod:: closed
507514

508-
See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
515+
See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.

neo4j/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"AsyncDriver",
2424
"AsyncGraphDatabase",
2525
"AsyncNeo4jDriver",
26+
"AsyncManagedTransaction",
2627
"AsyncResult",
2728
"AsyncSession",
2829
"AsyncTransaction",
@@ -42,6 +43,7 @@
4243
"IPv4Address",
4344
"IPv6Address",
4445
"kerberos_auth",
46+
"ManagedTransaction",
4547
"Neo4jDriver",
4648
"PoolConfig",
4749
"Query",
@@ -72,6 +74,7 @@
7274
AsyncNeo4jDriver,
7375
)
7476
from ._async.work import (
77+
AsyncManagedTransaction,
7578
AsyncResult,
7679
AsyncSession,
7780
AsyncTransaction,
@@ -83,6 +86,7 @@
8386
Neo4jDriver,
8487
)
8588
from ._sync.work import (
89+
ManagedTransaction,
8690
Result,
8791
Session,
8892
Transaction,

neo4j/_async/work/__init__.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,18 @@
1919
from .session import (
2020
AsyncResult,
2121
AsyncSession,
22-
AsyncTransaction,
2322
AsyncWorkspace,
2423
)
24+
from .transaction import (
25+
AsyncManagedTransaction,
26+
AsyncTransaction,
27+
)
2528

2629

2730
__all__ = [
2831
"AsyncResult",
2932
"AsyncSession",
3033
"AsyncTransaction",
34+
"AsyncManagedTransaction",
3135
"AsyncWorkspace",
3236
]

neo4j/_async/work/session.py

+14-9
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
# limitations under the License.
1717

1818

19-
import asyncio
2019
from logging import getLogger
2120
from random import random
2221
from time import perf_counter
@@ -44,7 +43,10 @@
4443
)
4544
from ...work import Query
4645
from .result import AsyncResult
47-
from .transaction import AsyncTransaction
46+
from .transaction import (
47+
AsyncManagedTransaction,
48+
AsyncTransaction,
49+
)
4850
from .workspace import AsyncWorkspace
4951

5052

@@ -157,8 +159,9 @@ async def close(self):
157159
self._state_failed = True
158160

159161
if self._transaction:
160-
if self._transaction.closed() is False:
161-
await self._transaction.rollback() # roll back the transaction if it is not closed
162+
if self._transaction._closed() is False:
163+
# roll back the transaction if it is not closed
164+
await self._transaction._rollback()
162165
self._transaction = None
163166

164167
try:
@@ -306,7 +309,7 @@ async def last_bookmarks(self):
306309
if self._auto_result:
307310
await self._auto_result.consume()
308311

309-
if self._transaction and self._transaction._closed:
312+
if self._transaction and self._transaction._closed():
310313
self._collect_bookmark(self._transaction._bookmark)
311314
self._transaction = None
312315

@@ -323,10 +326,10 @@ async def _transaction_error_handler(self, _):
323326
self._transaction = None
324327
await self._disconnect()
325328

326-
async def _open_transaction(self, *, access_mode, metadata=None,
329+
async def _open_transaction(self, *, tx_cls, access_mode, metadata=None,
327330
timeout=None):
328331
await self._connect(access_mode=access_mode)
329-
self._transaction = AsyncTransaction(
332+
self._transaction = tx_cls(
330333
self._connection, self._config.fetch_size,
331334
self._transaction_closed_handler,
332335
self._transaction_error_handler
@@ -372,6 +375,7 @@ async def begin_transaction(self, metadata=None, timeout=None):
372375
raise TransactionError("Explicit transaction already open")
373376

374377
await self._open_transaction(
378+
tx_cls=AsyncTransaction,
375379
access_mode=self._config.default_access_mode, metadata=metadata,
376380
timeout=timeout
377381
)
@@ -396,17 +400,18 @@ async def _run_transaction(
396400
while True:
397401
try:
398402
await self._open_transaction(
403+
tx_cls=AsyncManagedTransaction,
399404
access_mode=access_mode, metadata=metadata,
400405
timeout=timeout
401406
)
402407
tx = self._transaction
403408
try:
404409
result = await transaction_function(tx, *args, **kwargs)
405410
except Exception:
406-
await tx.close()
411+
await tx._close()
407412
raise
408413
else:
409-
await tx.commit()
414+
await tx._commit()
410415
except IncompleteCommit:
411416
raise
412417
except (ServiceUnavailable, SessionExpired) as error:

0 commit comments

Comments
 (0)