Skip to content
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7043662
Introduce API Redesign
bigmontz Jul 20, 2022
3db7a70
Add execute
bigmontz Jul 20, 2022
c4d2c89
QueryResult as a named tuple
bigmontz Jul 20, 2022
e0c879c
it's automatic
bigmontz Jul 20, 2022
d0293e7
Add transaction.query to async
bigmontz Jul 21, 2022
5d52bab
Add Session.execute to async
bigmontz Jul 21, 2022
26b19a9
Add AsyncSession.query
bigmontz Jul 21, 2022
e3b1223
Add AsyncDriver.execute()
bigmontz Jul 21, 2022
5759926
Add AsyncDriver.query
bigmontz Jul 21, 2022
7fe9204
re-generate sync driver
bigmontz Jul 21, 2022
7896b06
Apply suggestions from code review
bigmontz Jul 21, 2022
9de9924
Apply suggestions
bigmontz Jul 21, 2022
2c7004c
apply more review suggestions
bigmontz Jul 21, 2022
62fb7ae
apply more review suggestions
bigmontz Jul 21, 2022
056ac54
docs
bigmontz Jul 21, 2022
915875d
Apply suggestions from code review
bigmontz Jul 21, 2022
7af3bd9
Apply suggestions from code review
bigmontz Jul 21, 2022
b6fcfb2
Apply code suggestions
bigmontz Jul 21, 2022
7fb2afd
Apply suggestions from code review
bigmontz Jul 22, 2022
6d5e550
Apply code review suggestions
bigmontz Jul 22, 2022
ed575f9
code suggestions
bigmontz Jul 22, 2022
8bb4498
Extracting known params to the method signatures
bigmontz Jul 22, 2022
b64b3c8
Reformat
bigmontz Jul 22, 2022
cc875d2
parameters=parameters
bigmontz Jul 22, 2022
0e8cbfc
parameters=parameters 2
bigmontz Jul 22, 2022
ecf966f
Apply suggestions from code review
bigmontz Jul 22, 2022
0ad6459
Single param tx function and extracting parameters
bigmontz Jul 25, 2022
a8d541f
Docs
bigmontz Jul 26, 2022
d147e62
code-style + run make-unasync
robsdedude Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,108 @@ async def supports_multi_db(self):
await session._connect(READ_ACCESS)
return session._connection.supports_multiple_databases

async def query(self, query, parameters=None, **kwargs):
"""
Run a Cypher query within an managed transaction and
all the retries policy will be applied.

The query is sent and the result header received
immediately and the :class:`neo4j.QueryResult`is
fetched.

For more usage details, see :meth:`.AsyncTransaction.query`.

For auto-commit queries, use `AsyncSession.run`.

For access to the neo4j.AsyncResult object,
use `AsyncDriver.execute` and `.AsyncTransaction.run`

:param query: cypher query
:type query: str, neo4j.Query
:param parameters: dictionary of parameters
:type parameters: dict
:param kwargs: additional keyword parameters

:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""
session_kwargs = {}
if "database" in kwargs:
Copy link
Member

@robsdedude robsdedude Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc string doesn't mention anywhere that the database kwarg has a special function and will not get used as a query parameter. Maybe, database=None should be an explicit keyword argument, as in

async def query(self, query, parameters=None, database=None, **kwargs):

Tbf, this also applies to driver.session. It does not specify what config options there are, but at least there is an explicit list of available config options in the API docs https://github.com/neo4j/neo4j-python-driver/blob/5.0/docs/source/api.rst#session-configuration

session_kwargs["database"] = kwargs.pop("database")

async with self.session(**session_kwargs) as session:
return await session.query(query, parameters, **kwargs)

async def execute(self, transaction_function, *args, **kwargs):
"""Execute a unit of work in a managed transaction.

.. note::
This does not necessarily imply access control, see the session
configuration option :ref:`default-access-mode-ref`.

This transaction will automatically be committed unless an exception
is thrown during query execution or by the user code.
Note, that this function perform retries and that the supplied `
transaction_function` might get invoked more than once.

Managed transactions should not generally be explicitly committed
(via ``tx.commit()``).

Example::

async def do_cypher_tx(tx, cypher):
records, _ = await tx.query(cypher)
return records

values = await driver.execute(do_cypher_tx, "RETURN 1 AS x")

Example::

async def do_cypher_tx(tx):
records, _ = await tx.query("RETURN 1 AS x")
return records

values = await driver.execute(
do_cypher_tx,
database="neo4j",
cluster_member_access=neo4j.api.CLUSTER_READERS_ACCESS
)

Example::

async def get_two_tx(tx):
result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
values = []
async for record in result:
if len(values) >= 2:
break
values.append(record.values())
# or shorter: values = [record.values()
# for record in await result.fetch(2)]

# discard the remaining records if there are any
summary = await result.consume()
# use the summary for logging etc.
return values


values = await driver.execute(get_two_tx)

:param transaction_function: a function that takes a transaction as an
argument and does work with the transaction.
``transaction_function(tx, *args, **kwargs)`` where ``tx`` is a
:class:`.Transaction`.
:param args: arguments for the ``transaction_function``
:param kwargs: key word arguments for the ``transaction_function``
:return: a result as returned by the given unit of work
"""
session_kwargs = {}
if "database" in kwargs:
session_kwargs["database"] = kwargs.pop("database")

async with self.session(**session_kwargs) as session:
return await session.execute(transaction_function, *args, **kwargs)


class AsyncBoltDriver(_Direct, AsyncDriver):
""":class:`.AsyncBoltDriver` is instantiated for ``bolt`` URIs and
Expand Down
6 changes: 6 additions & 0 deletions neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ async def acquire(
self.address, deadline, liveness_check_timeout
)

def is_direct(self):
return True


class AsyncNeo4jPool(AsyncIOPool):
""" Connection pool with routing table.
Expand Down Expand Up @@ -789,3 +792,6 @@ def on_write_failure(self, address):
for database in self.routing_tables.keys():
self.routing_tables[database].writers.discard(address)
log.debug("[#0000] C: <ROUTING> table=%r", self.routing_tables)

def is_direct(self):
return False
3 changes: 3 additions & 0 deletions neo4j/_async/work/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from collections import deque
from warnings import warn
from collections import namedtuple

from ..._async_compat.util import AsyncUtil
from ..._codec.hydration import BrokenHydrationObject
Expand Down Expand Up @@ -707,3 +708,5 @@ def closed(self):
.. versionadded:: 5.0
"""
return self._out_of_scope or self._consumed

QueryResult = namedtuple("QueryResult", ("records", "summary"))
128 changes: 127 additions & 1 deletion neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
)
from ...api import (
Bookmarks,
CLUSTER_AUTO_ACCESS,
CLUSTER_READERS_ACCESS,
CLUSTER_WRITERS_ACCESS,
READ_ACCESS,
WRITE_ACCESS,
)
Expand All @@ -40,7 +43,10 @@
TransactionError,
)
from ...work import Query
from .result import AsyncResult
from .result import (
AsyncResult,
QueryResult,
)
from .transaction import (
AsyncManagedTransaction,
AsyncTransaction,
Expand Down Expand Up @@ -239,6 +245,126 @@ async def run(self, query, parameters=None, **kwargs):

return self._auto_result

async def query(self, query, parameters=None, **kwargs):
"""
Run a Cypher query within an managed transaction and
all the retries policy will be applied.

The query is sent and the result header received
immediately and the :class:`neo4j.QueryResult`is
fetched.

For more usage details, see :meth:`.AsyncTransaction.query`.

For auto-commit queries, use `AsyncSession.run`.

For access to the neo4j.AsyncResult object,
use `AsyncSession.execute` and `.AsyncTransaction.run`

:param query: cypher query
:type query: str, neo4j.Query
:param parameters: dictionary of parameters
:type parameters: dict
:param kwargs: additional keyword parameters

:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""
skip_records = kwargs.pop("skip_records", False)

async def job(tx, **job_kwargs):
if skip_records:
result = await tx.run(query, parameters, **job_kwargs)
summary = await result.consume()
return QueryResult([], summary)
return await tx.query(query, parameters, **job_kwargs)

return await self.execute(job, **kwargs)

async def execute(self, transaction_function, *args, **kwargs):
"""Execute a unit of work in a managed transaction.

.. note::
This does not necessarily imply access control, see the session
configuration option :ref:`default-access-mode-ref`.

This transaction will automatically be committed unless an exception
is thrown during query execution or by the user code.
Note, that this function perform retries and that the supplied
`transaction_function` might get invoked more than once.

Managed transactions should not generally be explicitly committed
(via ``tx.commit()``).

Example::

async def do_cypher_tx(tx, cypher):
records, _ = await tx.query(cypher)
return records

async with driver.session() as session:
values = session.execute(do_cypher_tx, "RETURN 1 AS x")

Example::

async def do_cypher_tx(tx):
records, _ = await tx.query("RETURN 1 AS x")
return records

async with driver.session() as session:
values = await session.execute(
do_cypher_tx,
cluster_member_access=neo4j.api.CLUSTER_READERS_ACCESS
)

Example::

async def get_two_tx(tx):
result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
values = []
async for record in result:
if len(values) >= 2:
break
values.append(record.values())
# or shorter: values = [record.values()
# for record in await result.fetch(2)]

# discard the remaining records if there are any
summary = await result.consume()
# use the summary for logging etc.
return values

async with driver.session() as session:
values = await session.execute(get_two_tx)

:param transaction_function: a function that takes a transaction as an
argument and does work with the transaction.
``transaction_function(tx, *args, **kwargs)`` where ``tx`` is a
:class:`.Transaction`.
:param args: arguments for the ``transaction_function``
:param kwargs: key word arguments for the ``transaction_function``
:return: a result as returned by the given unit of work
"""
cluster_member_access = kwargs.pop(
"cluster_member_access", CLUSTER_AUTO_ACCESS
)

if cluster_member_access == CLUSTER_AUTO_ACCESS:
if await self._supports_auto_routing():
access_mode = READ_ACCESS
else:
raise ValueError("Server does not support CLUSTER_AUTO_ACCESS")
elif cluster_member_access == CLUSTER_READERS_ACCESS:
access_mode = READ_ACCESS
elif cluster_member_access == CLUSTER_WRITERS_ACCESS:
access_mode = WRITE_ACCESS
else:
raise ValueError("Invalid cluster_member_access")

return await self._run_transaction(
access_mode, transaction_function, *args, **kwargs
)

@deprecated(
"`last_bookmark` has been deprecated in favor of `last_bookmarks`. "
"This method can lead to unexpected behaviour."
Expand Down
42 changes: 41 additions & 1 deletion neo4j/_async/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
from ...exceptions import TransactionError
from ...work import Query
from ..io import ConnectionErrorHandler
from .result import AsyncResult
from .result import (
AsyncResult,
QueryResult,
)


__all__ = ("AsyncTransaction", "AsyncManagedTransaction")
Expand Down Expand Up @@ -131,6 +134,43 @@ async def run(self, query, parameters=None, **kwparameters):

return result

async def query(self, query, parameters=None, **kwparameters):
""" Run a Cypher query within the context of this transaction.

Cypher is typically expressed as a query template plus a
set of named parameters. In Python, parameters may be expressed
through a dictionary of parameters, through individual parameter
arguments, or as a mixture of both. For example, the `run`
queries below are all equivalent::

>>> query = "CREATE (a:Person { name: $name, age: $age })"
>>> query_result = await tx.query(query, {"name": "Alice", "age": 33})
>>> query_result = await tx.query(query, {"name": "Alice"}, age=33)
>>> query_result = await tx.query(query, name="Alice", age=33)

Parameter values can be of any type supported by the Neo4j type
system. In Python, this includes :class:`bool`, :class:`int`,
:class:`str`, :class:`list` and :class:`dict`. Note however that
:class:`list` properties must be homogenous.

:param query: cypher query
:type query: str
:param parameters: dictionary of parameters
:type parameters: dict
:param kwparameters: additional keyword parameters

:returns: a new :class:`neo4j.QueryResult` object
:rtype: :class:`neo4j.QueryResult`

:raise TransactionError: if the transaction is already closed
"""
result = await self.run(query, parameters, **kwparameters)
records = []
async for x in result:
records.append(x)
summary = await result.consume()
return QueryResult(records, summary)

async def _commit(self):
"""Mark this transaction as successful and close in order to trigger a COMMIT.

Expand Down
13 changes: 13 additions & 0 deletions neo4j/_async/work/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import asyncio

from neo4j.api import READ_ACCESS

from ..._conf import WorkspaceConfig
from ..._deadline import Deadline
from ..._meta import (
Expand Down Expand Up @@ -125,6 +127,17 @@ async def _disconnect(self, sync=False):
self._connection = None
self._connection_access_mode = None

async def _supports_auto_routing(self):
if self._pool.is_direct():
return True

await self._connect(READ_ACCESS)
supports_auto_routing = self._connection.configuration_hints.get(
"server_side_routing", False
)
await self._disconnect()
return supports_auto_routing

async def close(self):
if self._closed:
return
Expand Down
Loading