Skip to content

Introduce API Redesign #763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7043662
Introduce API Redesign
bigmontz Jul 20, 2022
3db7a70
Add execute
bigmontz Jul 20, 2022
c4d2c89
QueryResult as a named tuple
bigmontz Jul 20, 2022
e0c879c
it's automatic
bigmontz Jul 20, 2022
d0293e7
Add transaction.query to async
bigmontz Jul 21, 2022
5d52bab
Add Session.execute to async
bigmontz Jul 21, 2022
26b19a9
Add AsyncSession.query
bigmontz Jul 21, 2022
e3b1223
Add AsyncDriver.execute()
bigmontz Jul 21, 2022
5759926
Add AsyncDriver.query
bigmontz Jul 21, 2022
7fe9204
re-generate sync driver
bigmontz Jul 21, 2022
7896b06
Apply suggestions from code review
bigmontz Jul 21, 2022
9de9924
Apply suggestions
bigmontz Jul 21, 2022
2c7004c
apply more review suggestions
bigmontz Jul 21, 2022
62fb7ae
apply more review suggestions
bigmontz Jul 21, 2022
056ac54
docs
bigmontz Jul 21, 2022
915875d
Apply suggestions from code review
bigmontz Jul 21, 2022
7af3bd9
Apply suggestions from code review
bigmontz Jul 21, 2022
b6fcfb2
Apply code suggestions
bigmontz Jul 21, 2022
7fb2afd
Apply suggestions from code review
bigmontz Jul 22, 2022
6d5e550
Apply code review suggestions
bigmontz Jul 22, 2022
ed575f9
code suggestions
bigmontz Jul 22, 2022
8bb4498
Extracting known params to the method signatures
bigmontz Jul 22, 2022
b64b3c8
Reformat
bigmontz Jul 22, 2022
cc875d2
parameters=parameters
bigmontz Jul 22, 2022
0e8cbfc
parameters=parameters 2
bigmontz Jul 22, 2022
ecf966f
Apply suggestions from code review
bigmontz Jul 22, 2022
0ad6459
Single param tx function and extracting parameters
bigmontz Jul 25, 2022
a8d541f
Docs
bigmontz Jul 26, 2022
d147e62
code-style + run make-unasync
robsdedude Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from ..addressing import Address
from ..api import (
CLUSTER_AUTO_ACCESS,
READ_ACCESS,
TRUST_ALL_CERTIFICATES,
TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
Expand Down Expand Up @@ -392,6 +393,155 @@ async def supports_multi_db(self):
await session._connect(READ_ACCESS)
return session._connection.supports_multiple_databases

async def query(
self, query, parameters=None, database=None,
cluster_member_access=CLUSTER_AUTO_ACCESS, skip_records=False,
metadata=None, timeout=None, **kwargs
):
"""
Run a Cypher query within an managed transaction.

The query is sent and the result header received
immediately and the :class:`neo4j.QueryResult`is
fetched.

For more usage details, see :meth:`.AsyncTransaction.query`.

For auto-commit queries, use :meth:`AsyncSession.run`.

For access to the :class:`neo4j.AsyncResult` object,
use :meth:`AsyncDriver.execute` and :meth:`.AsyncTransaction.run`

:param query: cypher query
:type query: str, neo4j.Query

:param parameters: dictionary of parameters
:type parameters: dict

:param database: the name of the database to be used
:type database: str

:param cluster_member_access: the kind of cluster member used
for running the work

:param metadata:
a dictionary with metadata.
For more usage details,
see :meth:`.AsyncSession.begin_transaction`.
:type metadata: dict

:param timeout:
the transaction timeout in seconds.
For more usage details,
see :meth:`.AsyncSession.begin_transaction`.
:type timeout: int

:param kwargs: additional keyword parameters

:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""

async with self.session(database=database) as session:
return await session.query(
query,
parameters=parameters,
cluster_member_access=cluster_member_access,
skip_records=skip_records,
timeout=timeout,
metadata=metadata,
**kwargs
)

async def execute(
self, transaction_function,
database=None, cluster_member_access=CLUSTER_AUTO_ACCESS,
metadata=None, timeout=None
):
"""Execute a unit of work in a managed transaction.

This transaction will automatically be committed unless an exception
is thrown during query execution or by the user code.
Note, that this function perform retries and that the supplied
``transaction_function`` might get invoked more than once.

Example::

async def do_cypher_tx(tx, cypher):
records, _ = await tx.query(cypher)
return records

values = await driver.execute(lambda tx: do_cypher_tx(tx, "RETURN 1 AS x"))

Example::

async def do_cypher_tx(tx):
records, _ = await tx.query("RETURN 1 AS x")
return records

values = await driver.execute(
do_cypher_tx,
database="neo4j",
cluster_member_access=neo4j.api.CLUSTER_READERS_ACCESS
)

Example::

async def get_two_tx(tx):
result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
values = []
async for record in result:
if len(values) >= 2:
break
values.append(record.values())
# or shorter: values = [record.values()
# for record in await result.fetch(2)]

# discard the remaining records if there are any
summary = await result.consume()
# use the summary for logging etc.
return values


values = await driver.execute(get_two_tx)

:param transaction_function: a function that takes a transaction as an
argument and does work with the transaction.
``transaction_function(tx)`` where ``tx`` is a
:class:`.Transaction`.

:param parameters: dictionary of parameters
:type parameters: dict

:param database: the name of the database to be used
:type database: str

:param cluster_member_access: the kind of cluster member used
for running the work

:param metadata:
a dictionary with metadata.
For more usage details,
see :meth:`.AsyncSession.begin_transaction`.
:type metadata: dict

:param timeout:
the transaction timeout in seconds.
For more usage details,
see :meth:`.AsyncSession.begin_transaction`.
:type timeout: int

:return: a result as returned by the given unit of work
"""

async with self.session(database=database) as session:
return await session.execute(
transaction_function,
cluster_member_access=cluster_member_access,
timeout=timeout,
metadata=metadata
)


class AsyncBoltDriver(_Direct, AsyncDriver):
""":class:`.AsyncBoltDriver` is instantiated for ``bolt`` URIs and
Expand Down
6 changes: 6 additions & 0 deletions neo4j/_async/io/_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ async def acquire(
self.address, deadline, liveness_check_timeout
)

def is_direct(self):
return True


class AsyncNeo4jPool(AsyncIOPool):
""" Connection pool with routing table.
Expand Down Expand Up @@ -789,3 +792,6 @@ def on_write_failure(self, address):
for database in self.routing_tables.keys():
self.routing_tables[database].writers.discard(address)
log.debug("[#0000] C: <ROUTING> table=%r", self.routing_tables)

def is_direct(self):
return False
3 changes: 3 additions & 0 deletions neo4j/_async/work/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from collections import deque
from warnings import warn
from collections import namedtuple

from ..._async_compat.util import AsyncUtil
from ..._codec.hydration import BrokenHydrationObject
Expand Down Expand Up @@ -707,3 +708,5 @@ def closed(self):
.. versionadded:: 5.0
"""
return self._out_of_scope or self._consumed

QueryResult = namedtuple("QueryResult", ("records", "summary"))
Loading