Skip to content
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7043662
Introduce API Redesign
bigmontz Jul 20, 2022
3db7a70
Add execute
bigmontz Jul 20, 2022
c4d2c89
QueryResult as a named tuple
bigmontz Jul 20, 2022
e0c879c
it's automatic
bigmontz Jul 20, 2022
d0293e7
Add transaction.query to async
bigmontz Jul 21, 2022
5d52bab
Add Session.execute to async
bigmontz Jul 21, 2022
26b19a9
Add AsyncSession.query
bigmontz Jul 21, 2022
e3b1223
Add AsyncDriver.execute()
bigmontz Jul 21, 2022
5759926
Add AsyncDriver.query
bigmontz Jul 21, 2022
7fe9204
re-generate sync driver
bigmontz Jul 21, 2022
7896b06
Apply suggestions from code review
bigmontz Jul 21, 2022
9de9924
Apply suggestions
bigmontz Jul 21, 2022
2c7004c
apply more review suggestions
bigmontz Jul 21, 2022
62fb7ae
apply more review suggestions
bigmontz Jul 21, 2022
056ac54
docs
bigmontz Jul 21, 2022
915875d
Apply suggestions from code review
bigmontz Jul 21, 2022
7af3bd9
Apply suggestions from code review
bigmontz Jul 21, 2022
b6fcfb2
Apply code suggestions
bigmontz Jul 21, 2022
7fb2afd
Apply suggestions from code review
bigmontz Jul 22, 2022
6d5e550
Apply code review suggestions
bigmontz Jul 22, 2022
ed575f9
code suggestions
bigmontz Jul 22, 2022
8bb4498
Extracting known params to the method signatures
bigmontz Jul 22, 2022
b64b3c8
Reformat
bigmontz Jul 22, 2022
cc875d2
parameters=parameters
bigmontz Jul 22, 2022
0e8cbfc
parameters=parameters 2
bigmontz Jul 22, 2022
ecf966f
Apply suggestions from code review
bigmontz Jul 22, 2022
0ad6459
Single param tx function and extracting parameters
bigmontz Jul 25, 2022
a8d541f
Docs
bigmontz Jul 26, 2022
d147e62
code-style + run make-unasync
robsdedude Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from ..addressing import Address
from ..api import (
CLUSTER_AUTO_ACCESS,
READ_ACCESS,
TRUST_ALL_CERTIFICATES,
TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
Expand Down Expand Up @@ -392,6 +393,113 @@ async def supports_multi_db(self):
await session._connect(READ_ACCESS)
return session._connection.supports_multiple_databases

async def query(self, query, parameters=None,
database=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
skip_records=False,
**kwargs):
"""
Run a Cypher query within an managed transaction.

The query is sent and the result header received
immediately and the :class:`neo4j.QueryResult`is
fetched.

For more usage details, see :meth:`.AsyncTransaction.query`.

For auto-commit queries, use :meth:`AsyncSession.run`.

For access to the :class:`neo4j.AsyncResult` object,
use :meth:`AsyncDriver.execute` and :meth:`.AsyncTransaction.run`

:param query: cypher query
:type query: str, neo4j.Query
:param parameters: dictionary of parameters
:type parameters: dict
:param kwargs: additional keyword parameters

:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""

async with self.session(database=database) as session:
return await session.query(
query,
parameters,
cluster_member_access=cluster_member_access,
skip_records=skip_records,
**kwargs
)

async def execute(self, transaction_function, *args,
database=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
**kwargs):
"""Execute a unit of work in a managed transaction.

This transaction will automatically be committed unless an exception
is thrown during query execution or by the user code.
Note, that this function perform retries and that the supplied
``transaction_function`` might get invoked more than once.

Example::

async def do_cypher_tx(tx, cypher):
records, _ = await tx.query(cypher)
return records

values = await driver.execute(do_cypher_tx, "RETURN 1 AS x")

Example::

async def do_cypher_tx(tx):
records, _ = await tx.query("RETURN 1 AS x")
return records

values = await driver.execute(
do_cypher_tx,
database="neo4j",
cluster_member_access=neo4j.api.CLUSTER_READERS_ACCESS
)

Example::

async def get_two_tx(tx):
result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
values = []
async for record in result:
if len(values) >= 2:
break
values.append(record.values())
# or shorter: values = [record.values()
# for record in await result.fetch(2)]

# discard the remaining records if there are any
summary = await result.consume()
# use the summary for logging etc.
return values


values = await driver.execute(get_two_tx)

:param transaction_function: a function that takes a transaction as an
argument and does work with the transaction.
``transaction_function(tx, *args, **kwargs)`` where ``tx`` is a
:class:`.Transaction`.
:param args: arguments for the ``transaction_function``
:param kwargs: key word arguments for the ``transaction_function``

:return: a result as returned by the given unit of work
"""

async with self.session(database=database) as session:
return await session.execute(
transaction_function,
*args,
cluster_member_access=cluster_member_access,
**kwargs
)


class AsyncBoltDriver(_Direct, AsyncDriver):
""":class:`.AsyncBoltDriver` is instantiated for ``bolt`` URIs and
Expand Down
6 changes: 6 additions & 0 deletions neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ async def acquire(
self.address, deadline, liveness_check_timeout
)

def is_direct(self):
return True


class AsyncNeo4jPool(AsyncIOPool):
""" Connection pool with routing table.
Expand Down Expand Up @@ -789,3 +792,6 @@ def on_write_failure(self, address):
for database in self.routing_tables.keys():
self.routing_tables[database].writers.discard(address)
log.debug("[#0000] C: <ROUTING> table=%r", self.routing_tables)

def is_direct(self):
return False
3 changes: 3 additions & 0 deletions neo4j/_async/work/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from collections import deque
from warnings import warn
from collections import namedtuple

from ..._async_compat.util import AsyncUtil
from ..._codec.hydration import BrokenHydrationObject
Expand Down Expand Up @@ -707,3 +708,5 @@ def closed(self):
.. versionadded:: 5.0
"""
return self._out_of_scope or self._consumed

QueryResult = namedtuple("QueryResult", ("records", "summary"))
128 changes: 127 additions & 1 deletion neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,26 @@
)
from ...api import (
Bookmarks,
CLUSTER_AUTO_ACCESS,
CLUSTER_READERS_ACCESS,
CLUSTER_WRITERS_ACCESS,
READ_ACCESS,
WRITE_ACCESS,
)
from ...exceptions import (
ClientError,
ConfigurationError,
DriverError,
Neo4jError,
ServiceUnavailable,
SessionExpired,
TransactionError,
)
from ...work import Query
from .result import AsyncResult
from .result import (
AsyncResult,
QueryResult,
)
from .transaction import (
AsyncManagedTransaction,
AsyncTransaction,
Expand Down Expand Up @@ -239,6 +246,125 @@ async def run(self, query, parameters=None, **kwargs):

return self._auto_result

async def query(self, query, parameters=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
skip_records=False,
**kwargs):
"""
Run a Cypher query within an managed transaction.

The query is sent and the full result is fetched and returned as
:class:`neo4j.QueryResult`.

For more usage details, see :meth:`.AsyncTransaction.query`.

For auto-commit queries, use :class:`AsyncSession.run`.

For access to the :class:`neo4j.AsyncResult` object,
use :meth:`AsyncSession.execute` and :meth:`.AsyncTransaction.run`

:param query: cypher query
:type query: str, neo4j.Query
:param parameters: dictionary of parameters
:type parameters: dict
:param kwargs: additional keyword parameters

:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""

async def job(tx, **job_kwargs):
if skip_records:
result = await tx.run(query, parameters, **job_kwargs)
summary = await result.consume()
return QueryResult([], summary)
return await tx.query(query, parameters, **job_kwargs)

return await self.execute(
job,
cluster_member_access=cluster_member_access,
**kwargs
)

async def execute(self, transaction_function, *args,
cluster_member_access=CLUSTER_AUTO_ACCESS,
**kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't like this design. Here is the problem: if you write a wrapper function that does not control the signature of transaction_function, you are screwed, because you can't make it right in the general case because transaction_function could have a keyword argument cluster_member_access. What now?

This could be addressed like so

    async def execute(
        self, transaction_function,
        transaction_function_args=None, transaction_function_kwargs=None,
        cluster_member_access=CLUSTER_AUTO_ACCESS
    ):

which is admittedly less nice to use. There is the third option

    async def execute(
        self, transaction_function, *args,
        transaction_function_args=None, transaction_function_kwargs=None,
        cluster_member_access=CLUSTER_AUTO_ACCESS, **kwargs
    ):

But I'm not sure if that's not just more confusing than it's helping. I really don't know. I think this is the major downside of cramming all our API into easy to access functions... They become bloated, more complicated, and have to do many things at once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster_member_access was already being not passed to the tx function in the previous version and it is kind of useless in that context, since none of the tx methods accept this argument.

I've tried to keep usage quite simple while keep the usage quite similar to the previous tx function. The difference is now the param which will be removed is documented.

An alternative is removing the extra arguments (*args and *kwargs), this way the tx function will only receive the transaction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Isn't transaction_function a function defined by user-level code (outside of the driver)? How can you know it doesn't have a cluster_member_access parameter?

Copy link
Contributor Author

@bigmontz bigmontz Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know. The Session.run also have the same problem, the user couldn't have a parameter called parameters (if they set the param like session.run('return n, parameters', n=1, parameters='a')).

We could also always add this name params to the tx_func kwargs. Which is kind of the approach used by to extract metadata and timeout in read_transaction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is true about the session.run, if I was to write a wrapper, I'd just never call session.run(query, **parameters) but instead session.run(query, parameters=parameters). That way, I can be sure that my wrapper can cope with any query parameters thrown at it.

We could also always add this name params to the tx_func kwargs. Which is kind of the approach used by to extract metadata and timeout in read_transaction.

I can't follow, sorry.

Copy link
Contributor Author

@bigmontz bigmontz Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_transaction and write_transaction get timeout and metadata from the **kwargs without remove it from it. We could do it with the cluster_member_access (and database, in the driver.execute case) by put it back to the **kwargs. However, we will have the issue with the documentation of the params.

For solving this, I'm suggesting keep the signature like:

async def execute(
        self, transaction_function, *args,
        cluster_member_access=None, **kwargs
    ):

Then, in the execute implementation do something like:

# user doesn't set anything
if cluster_member_access is None:
    cluster_member_access = CLUSTER_AUTO_ACCESS
# user set something
else:
    kwargs['cluster_member_access'] = cluster_member_access

The problem is: we don't know if the user set named param cluster_member_access or not, we only can guess if they are using the default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still another issue: the user could use a different meaning and dataset for the cluster_member_access variable.

"""Execute a unit of work in a managed transaction.

This transaction will automatically be committed unless an exception
is thrown during query execution or by the user code.
Note, that this function perform retries and that the supplied
``transaction_function`` might get invoked more than once.


Example::

async def do_cypher_tx(tx, cypher):
records, _ = await tx.query(cypher)
return records

async with driver.session() as session:
values = session.execute(do_cypher_tx, "RETURN 1 AS x")

Example::

async def do_cypher_tx(tx):
records, _ = await tx.query("RETURN 1 AS x")
return records

async with driver.session() as session:
values = await session.execute(
do_cypher_tx,
cluster_member_access=neo4j.api.CLUSTER_READERS_ACCESS
)

Example::

async def get_two_tx(tx):
result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
values = []
async for record in result:
if len(values) >= 2:
break
values.append(record.values())
# or shorter: values = [record.values()
# for record in await result.fetch(2)]

# discard the remaining records if there are any
summary = await result.consume()
# use the summary for logging etc.
return values

async with driver.session() as session:
values = await session.execute(get_two_tx)

:param transaction_function: a function that takes a transaction as an
argument and does work with the transaction.
``transaction_function(tx, *args, **kwargs)`` where ``tx`` is a
:class:`.Transaction`.
:param args: arguments for the ``transaction_function``
:param kwargs: key word arguments for the ``transaction_function``

:return: a result as returned by the given unit of work
"""
if cluster_member_access == CLUSTER_AUTO_ACCESS:
if await self._supports_auto_routing():
access_mode = READ_ACCESS
else:
raise ConfigurationError(
"Server does not support CLUSTER_AUTO_ACCESS"
)
elif cluster_member_access == CLUSTER_READERS_ACCESS:
access_mode = READ_ACCESS
elif cluster_member_access == CLUSTER_WRITERS_ACCESS:
access_mode = WRITE_ACCESS
else:
raise ClientError("Invalid cluster_member_access")

return await self._run_transaction(
access_mode, transaction_function, *args, **kwargs
)

@deprecated(
"`last_bookmark` has been deprecated in favor of `last_bookmarks`. "
"This method can lead to unexpected behaviour."
Expand Down
40 changes: 39 additions & 1 deletion neo4j/_async/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
from ...exceptions import TransactionError
from ...work import Query
from ..io import ConnectionErrorHandler
from .result import AsyncResult
from .result import (
AsyncResult,
QueryResult,
)


__all__ = ("AsyncTransaction", "AsyncManagedTransaction")
Expand Down Expand Up @@ -131,6 +134,41 @@ async def run(self, query, parameters=None, **kwparameters):

return result

async def query(self, query, parameters=None, **kwparameters):
""" Run a Cypher query within the context of this transaction.

Cypher is typically expressed as a query template plus a
set of named parameters. In Python, parameters may be expressed
through a dictionary of parameters, through individual parameter
arguments, or as a mixture of both. For example, the `run`
queries below are all equivalent::

>>> query = "CREATE (a:Person { name: $name, age: $age })"
>>> query_result = await tx.query(query, {"name": "Alice", "age": 33})
>>> query_result = await tx.query(query, {"name": "Alice"}, age=33)
>>> query_result = await tx.query(query, name="Alice", age=33)

Parameter values can be of any type supported by the Neo4j type
system. In Python, this includes :class:`bool`, :class:`int`,
:class:`str`, :class:`list` and :class:`dict`. Note however that
:class:`list` properties must be homogenous.

:param query: cypher query
:type query: str
:param parameters: dictionary of parameters
:type parameters: dict
:param kwparameters: additional keyword parameters

:returns: the result of the query
:rtype: :class:`neo4j.QueryResult`

:raise TransactionError: if the transaction is already closed
"""
result = await self.run(query, parameters, **kwparameters)
records = await AsyncUtil.list(result)
summary = await result.consume()
return QueryResult(records, summary)

async def _commit(self):
"""Mark this transaction as successful and close in order to trigger a COMMIT.

Expand Down
13 changes: 13 additions & 0 deletions neo4j/_async/work/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import asyncio

from neo4j.api import READ_ACCESS

from ..._conf import WorkspaceConfig
from ..._deadline import Deadline
from ..._meta import (
Expand Down Expand Up @@ -125,6 +127,17 @@ async def _disconnect(self, sync=False):
self._connection = None
self._connection_access_mode = None

async def _supports_auto_routing(self):
if self._pool.is_direct():
return True

await self._connect(READ_ACCESS)
supports_auto_routing = self._connection.configuration_hints.get(
"server_side_routing", False
)
await self._disconnect()
return supports_auto_routing

async def close(self):
if self._closed:
return
Expand Down
Loading