Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7043662
Introduce API Redesign
bigmontz Jul 20, 2022
3db7a70
Add execute
bigmontz Jul 20, 2022
c4d2c89
QueryResult as a named tuple
bigmontz Jul 20, 2022
e0c879c
it's automatic
bigmontz Jul 20, 2022
d0293e7
Add transaction.query to async
bigmontz Jul 21, 2022
5d52bab
Add Session.execute to async
bigmontz Jul 21, 2022
26b19a9
Add AsyncSession.query
bigmontz Jul 21, 2022
e3b1223
Add AsyncDriver.execute()
bigmontz Jul 21, 2022
5759926
Add AsyncDriver.query
bigmontz Jul 21, 2022
7fe9204
re-generate sync driver
bigmontz Jul 21, 2022
7896b06
Apply suggestions from code review
bigmontz Jul 21, 2022
9de9924
Apply suggestions
bigmontz Jul 21, 2022
2c7004c
apply more review suggestions
bigmontz Jul 21, 2022
62fb7ae
apply more review suggestions
bigmontz Jul 21, 2022
056ac54
docs
bigmontz Jul 21, 2022
915875d
Apply suggestions from code review
bigmontz Jul 21, 2022
7af3bd9
Apply suggestions from code review
bigmontz Jul 21, 2022
b6fcfb2
Apply code suggestions
bigmontz Jul 21, 2022
7fb2afd
Apply suggestions from code review
bigmontz Jul 22, 2022
6d5e550
Apply code review suggestions
bigmontz Jul 22, 2022
ed575f9
code suggestions
bigmontz Jul 22, 2022
8bb4498
Extracting known params to the method signatures
bigmontz Jul 22, 2022
b64b3c8
Reformat
bigmontz Jul 22, 2022
cc875d2
parameters=parameters
bigmontz Jul 22, 2022
0e8cbfc
parameters=parameters 2
bigmontz Jul 22, 2022
ecf966f
Apply suggestions from code review
bigmontz Jul 22, 2022
0ad6459
Single param tx function and extracting parameters
bigmontz Jul 25, 2022
a8d541f
Docs
bigmontz Jul 26, 2022
d147e62
code-style + run make-unasync
robsdedude Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions neo4j/_async/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from ..addressing import Address
from ..api import (
CLUSTER_AUTO_ACCESS,
READ_ACCESS,
TRUST_ALL_CERTIFICATES,
TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
Expand Down Expand Up @@ -392,7 +393,11 @@ async def supports_multi_db(self):
await session._connect(READ_ACCESS)
return session._connection.supports_multiple_databases

async def query(self, query, parameters=None, **kwargs):
async def query(self, query, parameters=None,
database=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
skip_records=False,
**kwargs):
"""
Run a Cypher query within an managed transaction.

Expand All @@ -416,14 +421,20 @@ async def query(self, query, parameters=None, **kwargs):
:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""
session_kwargs = {}
if "database" in kwargs:
session_kwargs["database"] = kwargs.pop("database")

async with self.session(**session_kwargs) as session:
return await session.query(query, parameters, **kwargs)
async with self.session(database=database) as session:
return await session.query(
query,
parameters,
cluster_member_access=cluster_member_access,
skip_records=skip_records,
**kwargs
)

async def execute(self, transaction_function, *args, **kwargs):
async def execute(self, transaction_function, *args,
database=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
**kwargs):
"""Execute a unit of work in a managed transaction.

This transaction will automatically be committed unless an exception
Expand Down Expand Up @@ -480,12 +491,14 @@ async def get_two_tx(tx):

:return: a result as returned by the given unit of work
"""
session_kwargs = {}
if "database" in kwargs:
session_kwargs["database"] = kwargs.pop("database")

async with self.session(**session_kwargs) as session:
return await session.execute(transaction_function, *args, **kwargs)
async with self.session(database=database) as session:
return await session.execute(
transaction_function,
*args,
cluster_member_access=cluster_member_access,
**kwargs
)


class AsyncBoltDriver(_Direct, AsyncDriver):
Expand Down
20 changes: 12 additions & 8 deletions neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ async def run(self, query, parameters=None, **kwargs):

return self._auto_result

async def query(self, query, parameters=None, **kwargs):
async def query(self, query, parameters=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
skip_records=False,
**kwargs):
"""
Run a Cypher query within an managed transaction.

Expand All @@ -269,7 +272,6 @@ async def query(self, query, parameters=None, **kwargs):
:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""
skip_records = kwargs.pop("skip_records", False)

async def job(tx, **job_kwargs):
if skip_records:
Expand All @@ -278,9 +280,15 @@ async def job(tx, **job_kwargs):
return QueryResult([], summary)
return await tx.query(query, parameters, **job_kwargs)

return await self.execute(job, **kwargs)
return await self.execute(
job,
cluster_member_access=cluster_member_access,
**kwargs
)

async def execute(self, transaction_function, *args, **kwargs):
async def execute(self, transaction_function, *args,
cluster_member_access=CLUSTER_AUTO_ACCESS,
**kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I don't like this design. Here is the problem: if you write a wrapper function that does not control the signature of transaction_function, you are screwed, because you can't make it right in the general case because transaction_function could have a keyword argument cluster_member_access. What now?

This could be addressed like so

    async def execute(
        self, transaction_function,
        transaction_function_args=None, transaction_function_kwargs=None,
        cluster_member_access=CLUSTER_AUTO_ACCESS
    ):

which is admittedly less nice to use. There is the third option

    async def execute(
        self, transaction_function, *args,
        transaction_function_args=None, transaction_function_kwargs=None,
        cluster_member_access=CLUSTER_AUTO_ACCESS, **kwargs
    ):

But I'm not sure if that's not just more confusing than it's helping. I really don't know. I think this is the major downside of cramming all our API into easy to access functions... They become bloated, more complicated, and have to do many things at once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster_member_access was already being not passed to the tx function in the previous version and it is kind of useless in that context, since none of the tx methods accept this argument.

I've tried to keep usage quite simple while keep the usage quite similar to the previous tx function. The difference is now the param which will be removed is documented.

An alternative is removing the extra arguments (*args and *kwargs), this way the tx function will only receive the transaction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Isn't transaction_function a function defined by user-level code (outside of the driver)? How can you know it doesn't have a cluster_member_access parameter?

Copy link
Contributor Author

@bigmontz bigmontz Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know. The Session.run also have the same problem, the user couldn't have a parameter called parameters (if they set the param like session.run('return n, parameters', n=1, parameters='a')).

We could also always add this name params to the tx_func kwargs. Which is kind of the approach used by to extract metadata and timeout in read_transaction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is true about the session.run, if I was to write a wrapper, I'd just never call session.run(query, **parameters) but instead session.run(query, parameters=parameters). That way, I can be sure that my wrapper can cope with any query parameters thrown at it.

We could also always add this name params to the tx_func kwargs. Which is kind of the approach used by to extract metadata and timeout in read_transaction.

I can't follow, sorry.

Copy link
Contributor Author

@bigmontz bigmontz Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_transaction and write_transaction get timeout and metadata from the **kwargs without remove it from it. We could do it with the cluster_member_access (and database, in the driver.execute case) by put it back to the **kwargs. However, we will have the issue with the documentation of the params.

For solving this, I'm suggesting keep the signature like:

async def execute(
        self, transaction_function, *args,
        cluster_member_access=None, **kwargs
    ):

Then, in the execute implementation do something like:

# user doesn't set anything
if cluster_member_access is None:
    cluster_member_access = CLUSTER_AUTO_ACCESS
# user set something
else:
    kwargs['cluster_member_access'] = cluster_member_access

The problem is: we don't know if the user set named param cluster_member_access or not, we only can guess if they are using the default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still another issue: the user could use a different meaning and dataset for the cluster_member_access variable.

"""Execute a unit of work in a managed transaction.

This transaction will automatically be committed unless an exception
Expand Down Expand Up @@ -339,10 +347,6 @@ async def get_two_tx(tx):

:return: a result as returned by the given unit of work
"""
cluster_member_access = kwargs.pop(
"cluster_member_access", CLUSTER_AUTO_ACCESS
)

if cluster_member_access == CLUSTER_AUTO_ACCESS:
if await self._supports_auto_routing():
access_mode = READ_ACCESS
Expand Down
37 changes: 25 additions & 12 deletions neo4j/_sync/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from ..addressing import Address
from ..api import (
CLUSTER_AUTO_ACCESS,
READ_ACCESS,
TRUST_ALL_CERTIFICATES,
TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
Expand Down Expand Up @@ -392,7 +393,11 @@ def supports_multi_db(self):
session._connect(READ_ACCESS)
return session._connection.supports_multiple_databases

def query(self, query, parameters=None, **kwargs):
def query(self, query, parameters=None,
database=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
skip_records=False,
**kwargs):
"""
Run a Cypher query within an managed transaction.

Expand All @@ -416,14 +421,20 @@ def query(self, query, parameters=None, **kwargs):
:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""
session_kwargs = {}
if "database" in kwargs:
session_kwargs["database"] = kwargs.pop("database")

with self.session(**session_kwargs) as session:
return session.query(query, parameters, **kwargs)
with self.session(database=database) as session:
return session.query(
query,
parameters,
cluster_member_access=cluster_member_access,
skip_records=skip_records,
**kwargs
)

def execute(self, transaction_function, *args, **kwargs):
def execute(self, transaction_function, *args,
database=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
**kwargs):
"""Execute a unit of work in a managed transaction.

This transaction will automatically be committed unless an exception
Expand Down Expand Up @@ -480,12 +491,14 @@ def get_two_tx(tx):

:return: a result as returned by the given unit of work
"""
session_kwargs = {}
if "database" in kwargs:
session_kwargs["database"] = kwargs.pop("database")

with self.session(**session_kwargs) as session:
return session.execute(transaction_function, *args, **kwargs)
with self.session(database=database) as session:
return session.execute(
transaction_function,
*args,
cluster_member_access=cluster_member_access,
**kwargs
)


class BoltDriver(_Direct, Driver):
Expand Down
20 changes: 12 additions & 8 deletions neo4j/_sync/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ def run(self, query, parameters=None, **kwargs):

return self._auto_result

def query(self, query, parameters=None, **kwargs):
def query(self, query, parameters=None,
cluster_member_access=CLUSTER_AUTO_ACCESS,
skip_records=False,
**kwargs):
"""
Run a Cypher query within an managed transaction.

Expand All @@ -269,7 +272,6 @@ def query(self, query, parameters=None, **kwargs):
:returns: a new :class:`neo4j.QueryResult` object
:rtype: QueryResult
"""
skip_records = kwargs.pop("skip_records", False)

def job(tx, **job_kwargs):
if skip_records:
Expand All @@ -278,9 +280,15 @@ def job(tx, **job_kwargs):
return QueryResult([], summary)
return tx.query(query, parameters, **job_kwargs)

return self.execute(job, **kwargs)
return self.execute(
job,
cluster_member_access=cluster_member_access,
**kwargs
)

def execute(self, transaction_function, *args, **kwargs):
def execute(self, transaction_function, *args,
cluster_member_access=CLUSTER_AUTO_ACCESS,
**kwargs):
"""Execute a unit of work in a managed transaction.

This transaction will automatically be committed unless an exception
Expand Down Expand Up @@ -339,10 +347,6 @@ def get_two_tx(tx):

:return: a result as returned by the given unit of work
"""
cluster_member_access = kwargs.pop(
"cluster_member_access", CLUSTER_AUTO_ACCESS
)

if cluster_member_access == CLUSTER_AUTO_ACCESS:
if self._supports_auto_routing():
access_mode = READ_ACCESS
Expand Down