diff --git a/docs/source/api.rst b/docs/source/api.rst index 65d5e278b..21ae22ee3 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -153,7 +153,196 @@ Closing a driver will immediately shut down all connections in the pool. query, use :meth:`neo4j.Driver.verify_connectivity`. .. autoclass:: neo4j.Driver() - :members: session, encrypted, close, verify_connectivity, get_server_info + :members: session, query_bookmark_manager, encrypted, close, + verify_connectivity, get_server_info + + .. method:: execute_query(query, parameters_=None,routing_=neo4j.RoutingControl.WRITERS, database_=None, impersonated_user_=None, bookmark_manager_=self.query_bookmark_manager, result_transformer_=Result.to_eager_result, **kwargs) + + Execute a query in a transaction function and return all results. + + This method is a handy wrapper for lower-level driver APIs like + sessions, transactions, and transaction functions. It is intended + for simple use cases where there is no need for managing all possible + options. + + The internal usage of transaction functions provides a retry-mechanism + for appropriate errors. Furthermore, this means that queries using + ``CALL {} IN TRANSACTIONS`` or the older ``USING PERIODIC COMMIT`` + will not work (use :meth:`Session.run` for these). + + The method is roughly equivalent to:: + + def execute_query( + query_, parameters_, routing_, database_, impersonated_user_, + bookmark_manager_, result_transformer_, **kwargs + ): + def work(tx): + result = tx.run(query_, parameters_, **kwargs) + return result_transformer_(result) + + with driver.session( + database=database_, + impersonated_user=impersonated_user_, + bookmark_manager=bookmark_manager_, + ) as session: + if routing_ == RoutingControl.WRITERS: + return session.execute_write(work) + elif routing_ == RoutingControl.READERS: + return session.execute_read(work) + + Usage example:: + + from typing import List + + import neo4j + + + def example(driver: neo4j.Driver) -> List[str]: + \"""Get the name of all 42 year-olds.\""" + records, summary, keys = driver.execute_query( + "MATCH (p:Person {age: $age}) RETURN p.name", + {"age": 42}, + routing_=neo4j.RoutingControl.READERS, # or just "r" + database_="neo4j", + ) + assert keys == ["p.name"] # not needed, just for illustration + # log_summary(summary) # log some metadata + return [str(record["p.name"]) for record in records] + # or: return [str(record[0]) for record in records] + # or even: return list(map(lambda r: str(r[0]), records)) + + Another example:: + + import neo4j + + + def example(driver: neo4j.Driver) -> int: + \"""Call all young people "My dear" and get their count.\""" + record = driver.execute_query( + "MATCH (p:Person) WHERE p.age <= $age " + "SET p.nickname = 'My dear' " + "RETURN count(*)", + # optional routing parameter, as write is default + # routing_=neo4j.RoutingControl.WRITERS, # or just "w", + database_="neo4j", + result_transformer_=neo4j.Result.single, + age=15, + ) + assert record is not None # for typechecking and illustration + count = record[0] + assert isinstance(count, int) + return count + + :param query_: cypher query to execute + :type query_: typing.Optional[str] + :param parameters_: parameters to use in the query + :type parameters_: typing.Optional[typing.Dict[str, typing.Any]] + :param routing_: + whether to route the query to a reader (follower/read replica) or + a writer (leader) in the cluster. Default is to route to a writer. + :type routing_: neo4j.RoutingControl + :param database_: + database to execute the query against. + + None (default) uses the database configured on the server side. + + .. Note:: + It is recommended to always specify the database explicitly + when possible. This allows the driver to work more efficiently, + as it will not have to resolve the default database first. + + See also the Session config :ref:`database-ref`. + :type database_: typing.Optional[str] + :param impersonated_user_: + Name of the user to impersonate. + + This means that all query will be executed in the security context + of the impersonated user. For this, the user for which the + :class:`Driver` has been created needs to have the appropriate + permissions. + + See also the Session config :ref:`impersonated-user-ref`. + :type impersonated_user_: typing.Optional[str] + :param result_transformer_: + A function that gets passed the :class:`neo4j.Result` object + resulting from the query and converts it to a different type. The + result of the transformer function is returned by this method. + + .. warning:: + + The transformer function must **not** return the + :class:`neo4j.Result` itself. + + Example transformer that checks that exactly one record is in the + result stream, then returns the record and the result summary:: + + from typing import Tuple + + import neo4j + + + def transformer( + result: neo4j.Result + ) -> Tuple[neo4j.Record, neo4j.ResultSummary]: + record = result.single(strict=True) + summary = result.consume() + return record, summary + + Note that methods of :class:`neo4j.Result` that don't take + mandatory arguments can be used directly as transformer functions. + For example:: + + import neo4j + + + def example(driver: neo4j.Driver) -> neo4j.Record:: + record = driver.execute_query( + "SOME QUERY", + result_transformer_=neo4j.Result.single + ) + + + # is equivalent to: + + + def transformer(result: neo4j.Result) -> neo4j.Record: + return result.single() + + + def example(driver: neo4j.Driver) -> neo4j.Record:: + record = driver.execute_query( + "SOME QUERY", + result_transformer_=transformer + ) + + :type result_transformer_: + typing.Callable[[neo4j.Result], typing.Union[T]] + :param bookmark_manager_: + Specify a bookmark manager to use. + + If present, the bookmark manager is used to keep the query causally + consistent with all work executed using the same bookmark manager. + + Defaults to the driver's :attr:`.query_bookmark_manager`. + + Pass :const:`None` to disable causal consistency. + :type bookmark_manager_: + typing.Union[neo4j.BookmarkManager, neo4j.BookmarkManager, + None] + :param kwargs: additional keyword parameters. None of these can end + with a single underscore. This is to avoid collisions with the + keyword configuration parameters of this method. If you need to + pass such a parameter, use the ``parameters_`` parameter instead. + These take precedence over parameters passed as ``parameters_``. + :type kwargs: typing.Any + + :returns: the result of the ``result_transformer`` + :rtype: T + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. versionadded:: 5.5 .. _driver-configuration-ref: @@ -974,11 +1163,22 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n .. automethod:: to_df + .. automethod:: to_eager_result + .. automethod:: closed See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping. +*********** +EagerResult +*********** + +.. autoclass:: neo4j.EagerResult + :show-inheritance: + :members: + + Graph ===== @@ -1370,15 +1570,17 @@ BookmarkManager Constants, Enums, Helpers ************************* -.. autoclass:: neo4j.Address +.. autoclass:: neo4j.RoutingControl :show-inheritance: :members: +.. autoclass:: neo4j.Address + :show-inheritance: + :members: .. autoclass:: neo4j.IPv4Address() :show-inheritance: - .. autoclass:: neo4j.IPv6Address() :show-inheritance: diff --git a/docs/source/async_api.rst b/docs/source/async_api.rst index dc18d23b8..f92965d48 100644 --- a/docs/source/async_api.rst +++ b/docs/source/async_api.rst @@ -135,7 +135,197 @@ Closing a driver will immediately shut down all connections in the pool. query, use :meth:`neo4j.AsyncDriver.verify_connectivity`. .. autoclass:: neo4j.AsyncDriver() - :members: session, encrypted, close, verify_connectivity, get_server_info + :members: session, query_bookmark_manager, encrypted, close, + verify_connectivity, get_server_info + + .. method:: execute_query(query, parameters_=None, routing_=neo4j.RoutingControl.WRITERS, database_=None, impersonated_user_=None, bookmark_manager_=self.query_bookmark_manager, result_transformer_=AsyncResult.to_eager_result, **kwargs) + :async: + + Execute a query in a transaction function and return all results. + + This method is a handy wrapper for lower-level driver APIs like + sessions, transactions, and transaction functions. It is intended + for simple use cases where there is no need for managing all possible + options. + + The internal usage of transaction functions provides a retry-mechanism + for appropriate errors. Furthermore, this means that queries using + ``CALL {} IN TRANSACTIONS`` or the older ``USING PERIODIC COMMIT`` + will not work (use :meth:`AsyncSession.run` for these). + + The method is roughly equivalent to:: + + async def execute_query( + query_, parameters_, routing_, database_, impersonated_user_, + bookmark_manager_, result_transformer_, **kwargs + ): + async def work(tx): + result = await tx.run(query_, parameters_, **kwargs) + return await result_transformer_(result) + + async with driver.session( + database=database_, + impersonated_user=impersonated_user_, + bookmark_manager=bookmark_manager_, + ) as session: + if routing_ == RoutingControl.WRITERS: + return await session.execute_write(work) + elif routing_ == RoutingControl.READERS: + return await session.execute_read(work) + + Usage example:: + + from typing import List + + import neo4j + + + async def example(driver: neo4j.AsyncDriver) -> List[str]: + \"""Get the name of all 42 year-olds.\""" + records, summary, keys = await driver.execute_query( + "MATCH (p:Person {age: $age}) RETURN p.name", + {"age": 42}, + routing_=neo4j.RoutingControl.READERS, # or just "r" + database_="neo4j", + ) + assert keys == ["p.name"] # not needed, just for illustration + # log_summary(summary) # log some metadata + return [str(record["p.name"]) for record in records] + # or: return [str(record[0]) for record in records] + # or even: return list(map(lambda r: str(r[0]), records)) + + Another example:: + + import neo4j + + + async def example(driver: neo4j.AsyncDriver) -> int: + \"""Call all young people "My dear" and get their count.\""" + record = await driver.execute_query( + "MATCH (p:Person) WHERE p.age <= 15 " + "SET p.nickname = 'My dear' " + "RETURN count(*)", + # optional routing parameter, as write is default + # routing_=neo4j.RoutingControl.WRITERS, # or just "w", + database_="neo4j", + result_transformer_=neo4j.AsyncResult.single, + ) + assert record is not None # for typechecking and illustration + count = record[0] + assert isinstance(count, int) + return count + + :param query_: cypher query to execute + :type query_: typing.Optional[str] + :param parameters_: parameters to use in the query + :type parameters_: typing.Optional[typing.Dict[str, typing.Any]] + :param routing_: + whether to route the query to a reader (follower/read replica) or + a writer (leader) in the cluster. Default is to route to a writer. + :type routing_: neo4j.RoutingControl + :param database_: + database to execute the query against. + + None (default) uses the database configured on the server side. + + .. Note:: + It is recommended to always specify the database explicitly + when possible. This allows the driver to work more efficiently, + as it will not have to resolve the default database first. + + See also the Session config :ref:`database-ref`. + :type database_: typing.Optional[str] + :param impersonated_user_: + Name of the user to impersonate. + + This means that all query will be executed in the security context + of the impersonated user. For this, the user for which the + :class:`Driver` has been created needs to have the appropriate + permissions. + + See also the Session config :ref:`impersonated-user-ref`. + :type impersonated_user_: typing.Optional[str] + :param result_transformer_: + A function that gets passed the :class:`neo4j.AsyncResult` object + resulting from the query and converts it to a different type. The + result of the transformer function is returned by this method. + + .. warning:: + + The transformer function must **not** return the + :class:`neo4j.AsyncResult` itself. + + Example transformer that checks that exactly one record is in the + result stream, then returns the record and the result summary:: + + from typing import Tuple + + import neo4j + + + async def transformer( + result: neo4j.AsyncResult + ) -> Tuple[neo4j.Record, neo4j.ResultSummary]: + record = await result.single(strict=True) + summary = await result.consume() + return record, summary + + Note that methods of :class:`neo4j.AsyncResult` that don't take + mandatory arguments can be used directly as transformer functions. + For example:: + + import neo4j + + + async def example(driver: neo4j.AsyncDriver) -> neo4j.Record:: + record = await driver.execute_query( + "SOME QUERY", + result_transformer_=neo4j.AsyncResult.single + ) + + + # is equivalent to: + + + async def transformer(result: neo4j.AsyncResult) -> neo4j.Record: + return await result.single() + + + async def example(driver: neo4j.AsyncDriver) -> neo4j.Record:: + record = await driver.execute_query( + "SOME QUERY", + result_transformer_=transformer + ) + + :type result_transformer_: + typing.Callable[[neo4j.AsyncResult], typing.Awaitable[T]] + :param bookmark_manager_: + Specify a bookmark manager to use. + + If present, the bookmark manager is used to keep the query causally + consistent with all work executed using the same bookmark manager. + + Defaults to the driver's :attr:`.query_bookmark_manager`. + + Pass :const:`None` to disable causal consistency. + :type bookmark_manager_: + typing.Union[neo4j.AsyncBookmarkManager, neo4j.BookmarkManager, + None] + :param kwargs: additional keyword parameters. None of these can end + with a single underscore. This is to avoid collisions with the + keyword configuration parameters of this method. If you need to + pass such a parameter, use the ``parameters_`` parameter instead. + These take precedence over parameters passed as ``parameters_``. + :type kwargs: typing.Any + + :returns: the result of the ``result_transformer`` + :rtype: T + + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. versionadded:: 5.5 .. _async-driver-configuration-ref: @@ -656,6 +846,8 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla .. automethod:: to_df + .. automethod:: to_eager_result + .. automethod:: closed See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping. diff --git a/src/neo4j/__init__.py b/src/neo4j/__init__.py index 3483e537f..bd2ca184f 100644 --- a/src/neo4j/__init__.py +++ b/src/neo4j/__init__.py @@ -18,6 +18,7 @@ from logging import getLogger as _getLogger +from ._api import RoutingControl from ._async.driver import ( AsyncBoltDriver, AsyncDriver, @@ -59,6 +60,7 @@ Session, Transaction, ) +from ._work import EagerResult from .addressing import ( Address, IPv4Address, @@ -114,6 +116,7 @@ "custom_auth", "DEFAULT_DATABASE", "Driver", + "EagerResult", "ExperimentalWarning", "get_user_agent", "GraphDatabase", @@ -129,6 +132,7 @@ "Record", "Result", "ResultSummary", + "RoutingControl", "ServerInfo", "Session", "SessionConfig", diff --git a/src/neo4j/_api.py b/src/neo4j/_api.py new file mode 100644 index 000000000..f9ca49a63 --- /dev/null +++ b/src/neo4j/_api.py @@ -0,0 +1,56 @@ +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [https://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import typing as t +from enum import Enum + + +if t.TYPE_CHECKING: + import typing_extensions as te + + +class RoutingControl(str, Enum): + """Selection which cluster members to route a query connect to. + + Inherits from :class:`str` and :class:`Enum`. Hence, every driver API + accepting a :class:`.RoutingControl` value will also accept a string + + >>> RoutingControl.READERS == "r" + True + >>> RoutingControl.WRITERS == "w" + True + + **This is experimental.** + It might be changed or removed any time even without prior notice. + + .. seealso:: + :attr:`.AsyncDriver.execute_query`, :attr:`.Driver.execute_query` + + .. versionadded:: 5.5 + """ + READERS = "r" + WRITERS = "w" + + +if t.TYPE_CHECKING: + T_RoutingControl = t.Union[ + RoutingControl, + te.Literal["r", "w"], + ] diff --git a/src/neo4j/_async/driver.py b/src/neo4j/_async/driver.py index 114e1e6d7..06027c01f 100644 --- a/src/neo4j/_async/driver.py +++ b/src/neo4j/_async/driver.py @@ -20,6 +20,7 @@ import asyncio import typing as t +import warnings if t.TYPE_CHECKING: @@ -28,7 +29,7 @@ import ssl - +from .._api import RoutingControl from .._async_compat.util import AsyncUtil from .._conf import ( Config, @@ -42,8 +43,10 @@ deprecation_warn, experimental, experimental_warn, + ExperimentalWarning, unclosed_resource_warn, ) +from .._work import EagerResult from ..addressing import Address from ..api import ( AsyncBookmarkManager, @@ -72,7 +75,31 @@ TBmConsumer as _TBmConsumer, TBmSupplier as _TBmSupplier, ) -from .work import AsyncSession +from .work import ( + AsyncManagedTransaction, + AsyncResult, + AsyncSession, +) + + +if t.TYPE_CHECKING: + import ssl + from enum import Enum + + import typing_extensions as te + + from .._api import T_RoutingControl + + + class _DefaultEnum(Enum): + default = "default" + + _default = _DefaultEnum.default + +else: + _default = object() + +_T = t.TypeVar("_T") class AsyncGraphDatabase: @@ -410,6 +437,12 @@ def __init__(self, pool, default_workspace_config): assert default_workspace_config is not None self._pool = pool self._default_workspace_config = default_workspace_config + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", + message=r".*\bbookmark manager\b.*", + category=ExperimentalWarning) + self._query_bookmark_manager = \ + AsyncGraphDatabase.bookmark_manager() async def __aenter__(self) -> AsyncDriver: return self @@ -482,6 +515,314 @@ async def close(self) -> None: raise self._closed = True + # overloads to work around https://github.com/python/mypy/issues/3737 + @t.overload + async def execute_query( + self, + query_: str, + parameters_: t.Optional[t.Dict[str, t.Any]] = None, + routing_: T_RoutingControl = RoutingControl.WRITERS, + database_: t.Optional[str] = None, + impersonated_user_: t.Optional[str] = None, + bookmark_manager_: t.Union[ + AsyncBookmarkManager, BookmarkManager, None + ] = ..., + result_transformer_: t.Callable[ + [AsyncResult], t.Awaitable[EagerResult] + ] = ..., + **kwargs: t.Any + ) -> EagerResult: + ... + + @t.overload + async def execute_query( + self, + query_: str, + parameters_: t.Optional[t.Dict[str, t.Any]] = None, + routing_: T_RoutingControl = RoutingControl.WRITERS, + database_: t.Optional[str] = None, + impersonated_user_: t.Optional[str] = None, + bookmark_manager_: t.Union[ + AsyncBookmarkManager, BookmarkManager, None + ] = ..., + result_transformer_: t.Callable[ + [AsyncResult], t.Awaitable[_T] + ] = ..., + **kwargs: t.Any + ) -> _T: + ... + + @experimental( + "Driver.execute_query is experimental. " + "It might be changed or removed any time even without prior notice." + ) + async def execute_query( + self, + query_: str, + parameters_: t.Optional[t.Dict[str, t.Any]] = None, + routing_: T_RoutingControl = RoutingControl.WRITERS, + database_: t.Optional[str] = None, + impersonated_user_: t.Optional[str] = None, + bookmark_manager_: t.Union[ + AsyncBookmarkManager, BookmarkManager, None, + te.Literal[_DefaultEnum.default] + ] = _default, + result_transformer_: t.Callable[ + [AsyncResult], t.Awaitable[t.Any] + ] = AsyncResult.to_eager_result, + **kwargs: t.Any + ) -> t.Any: + """Execute a query in a transaction function and return all results. + + This method is a handy wrapper for lower-level driver APIs like + sessions, transactions, and transaction functions. It is intended + for simple use cases where there is no need for managing all possible + options. + + The internal usage of transaction functions provides a retry-mechanism + for appropriate errors. Furthermore, this means that queries using + ``CALL {} IN TRANSACTIONS`` or the older ``USING PERIODIC COMMIT`` + will not work (use :meth:`AsyncSession.run` for these). + + The method is roughly equivalent to:: + + async def execute_query( + query_, parameters_, routing_, database_, impersonated_user_, + bookmark_manager_, result_transformer_, **kwargs + ): + async def work(tx): + result = await tx.run(query_, parameters_, **kwargs) + return await result_transformer_(result) + + async with driver.session( + database=database_, + impersonated_user=impersonated_user_, + bookmark_manager=bookmark_manager_, + ) as session: + if routing_ == RoutingControl.WRITERS: + return await session.execute_write(work) + elif routing_ == RoutingControl.READERS: + return await session.execute_read(work) + + Usage example:: + + from typing import List + + import neo4j + + + async def example(driver: neo4j.AsyncDriver) -> List[str]: + \"""Get the name of all 42 year-olds.\""" + records, summary, keys = await driver.execute_query( + "MATCH (p:Person {age: $age}) RETURN p.name", + {"age": 42}, + routing_=neo4j.RoutingControl.READERS, # or just "r" + database_="neo4j", + ) + assert keys == ["p.name"] # not needed, just for illustration + # log_summary(summary) # log some metadata + return [str(record["p.name"]) for record in records] + # or: return [str(record[0]) for record in records] + # or even: return list(map(lambda r: str(r[0]), records)) + + Another example:: + + import neo4j + + + async def example(driver: neo4j.AsyncDriver) -> int: + \"""Call all young people "My dear" and get their count.\""" + record = await driver.execute_query( + "MATCH (p:Person) WHERE p.age <= 15 " + "SET p.nickname = 'My dear' " + "RETURN count(*)", + # optional routing parameter, as write is default + # routing_=neo4j.RoutingControl.WRITERS, # or just "w", + database_="neo4j", + result_transformer_=neo4j.AsyncResult.single, + ) + assert record is not None # for typechecking and illustration + count = record[0] + assert isinstance(count, int) + return count + + :param query_: cypher query to execute + :type query_: typing.Optional[str] + :param parameters_: parameters to use in the query + :type parameters_: typing.Optional[typing.Dict[str, typing.Any]] + :param routing_: + whether to route the query to a reader (follower/read replica) or + a writer (leader) in the cluster. Default is to route to a writer. + :type routing_: neo4j.RoutingControl + :param database_: + database to execute the query against. + + None (default) uses the database configured on the server side. + + .. Note:: + It is recommended to always specify the database explicitly + when possible. This allows the driver to work more efficiently, + as it will not have to resolve the default database first. + + See also the Session config :ref:`database-ref`. + :type database_: typing.Optional[str] + :param impersonated_user_: + Name of the user to impersonate. + + This means that all query will be executed in the security context + of the impersonated user. For this, the user for which the + :class:`Driver` has been created needs to have the appropriate + permissions. + + See also the Session config :ref:`impersonated-user-ref`. + :type impersonated_user_: typing.Optional[str] + :param result_transformer_: + A function that gets passed the :class:`neo4j.AsyncResult` object + resulting from the query and converts it to a different type. The + result of the transformer function is returned by this method. + + .. warning:: + + The transformer function must **not** return the + :class:`neo4j.AsyncResult` itself. + + Example transformer that checks that exactly one record is in the + result stream, then returns the record and the result summary:: + + from typing import Tuple + + import neo4j + + + async def transformer( + result: neo4j.AsyncResult + ) -> Tuple[neo4j.Record, neo4j.ResultSummary]: + record = await result.single(strict=True) + summary = await result.consume() + return record, summary + + Note that methods of :class:`neo4j.AsyncResult` that don't take + mandatory arguments can be used directly as transformer functions. + For example:: + + import neo4j + + + async def example(driver: neo4j.AsyncDriver) -> neo4j.Record:: + record = await driver.execute_query( + "SOME QUERY", + result_transformer_=neo4j.AsyncResult.single + ) + + + # is equivalent to: + + + async def transformer(result: neo4j.AsyncResult) -> neo4j.Record: + return await result.single() + + + async def example(driver: neo4j.AsyncDriver) -> neo4j.Record:: + record = await driver.execute_query( + "SOME QUERY", + result_transformer_=transformer + ) + + :type result_transformer_: + typing.Callable[[neo4j.AsyncResult], typing.Awaitable[T]] + :param bookmark_manager_: + Specify a bookmark manager to use. + + If present, the bookmark manager is used to keep the query causally + consistent with all work executed using the same bookmark manager. + + Defaults to the driver's :attr:`.query_bookmark_manager`. + + Pass :const:`None` to disable causal consistency. + :type bookmark_manager_: + typing.Union[neo4j.AsyncBookmarkManager, neo4j.BookmarkManager, + None] + :param kwargs: additional keyword parameters. None of these can end + with a single underscore. This is to avoid collisions with the + keyword configuration parameters of this method. If you need to + pass such a parameter, use the ``parameters_`` parameter instead. + These take precedence over parameters passed as ``parameters_``. + :type kwargs: typing.Any + + :returns: the result of the ``result_transformer`` + :rtype: T + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. versionadded:: 5.5 + """ + invalid_kwargs = [k for k in kwargs if + k[-2:-1] != "_" and k[-1:] == "_"] + if invalid_kwargs: + raise ValueError( + "keyword parameters must not end with a single '_'. Found: %r" + "\nYou either misspelled an existing configuration parameter " + "or tried to send a query parameter that is reserved. In the " + "latter case, use the `parameters_` dictionary instead." + % invalid_kwargs + ) + parameters = dict(parameters_ or {}, **kwargs) + + if bookmark_manager_ is _default: + bookmark_manager_ = self._query_bookmark_manager + assert bookmark_manager_ is not _default + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", + message=r".*\bbookmark_manager\b.*", + category=ExperimentalWarning) + session = self.session(database=database_, + impersonated_user=impersonated_user_, + bookmark_manager=bookmark_manager_) + async with session: + if routing_ == RoutingControl.WRITERS: + executor = session.execute_write + elif routing_ == RoutingControl.READERS: + executor = session.execute_read + else: + raise ValueError("Invalid routing control value: %r" + % routing_) + return await executor( + _work, query_, parameters, result_transformer_ + ) + + @property + @experimental( + "Driver.query_bookmark_manager is experimental. " + "It might be changed or removed any time even without prior notice." + ) + def query_bookmark_manager(self) -> AsyncBookmarkManager: + """The driver's default query bookmark manager. + + This is the default :class:`AsyncBookmarkManager` used by + :meth:`.execute_query`. This can be used to causally chain + :meth:`.execute_query` calls and sessions. Example:: + + async def example(driver: neo4j.AsyncDriver) -> None: + await driver.execute_query("") + async with driver.session( + bookmark_manager=driver.query_bookmark_manager + ) as session: + # every query inside this session will be causally chained + # (i.e., can read what was written by ) + await session.run("") + # subsequent execute_query calls will be causally chained + # (i.e., can read what was written by ) + await driver.execute_query("") + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. versionadded:: 5.5 + """ + return self._query_bookmark_manager + if t.TYPE_CHECKING: async def verify_connectivity( @@ -627,6 +968,22 @@ async def supports_multi_db(self) -> bool: return session._connection.supports_multiple_databases +async def _work( + tx: AsyncManagedTransaction, + query: str, + parameters: t.Dict[str, t.Any], + transformer: t.Callable[[AsyncResult], t.Awaitable[_T]] +) -> _T: + res = await tx.run(query, parameters) + if transformer is AsyncResult.to_eager_result: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", + message=r".*\bto_eager_result\b.*", + category=ExperimentalWarning) + return await transformer(res) + return await transformer(res) + + class AsyncBoltDriver(_Direct, AsyncDriver): """:class:`.AsyncBoltDriver` is instantiated for ``bolt`` URIs and addresses a single database machine. This may be a standalone server or diff --git a/src/neo4j/_async/work/__init__.py b/src/neo4j/_async/work/__init__.py index 264e6cb99..e769bdd85 100644 --- a/src/neo4j/_async/work/__init__.py +++ b/src/neo4j/_async/work/__init__.py @@ -15,9 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from .session import ( +from .result import ( AsyncResult, + EagerResult, +) +from .session import ( AsyncSession, AsyncWorkspace, ) @@ -35,4 +37,5 @@ "AsyncTransaction", "AsyncTransactionBase", "AsyncWorkspace", + "EagerResult", ] diff --git a/src/neo4j/_async/work/result.py b/src/neo4j/_async/work/result.py index 9cfde69a9..5d17dd1af 100644 --- a/src/neo4j/_async/work/result.py +++ b/src/neo4j/_async/work/result.py @@ -19,7 +19,10 @@ from __future__ import annotations import typing as t -from collections import deque +from collections import ( + deque, + namedtuple, +) from warnings import warn @@ -35,6 +38,7 @@ RecordTableRowExporter, ) from ..._meta import experimental +from ..._work import EagerResult from ...exceptions import ( ResultConsumedError, ResultNotSingleError, @@ -600,6 +604,35 @@ async def data(self, *keys: _TResultKey) -> t.List[t.Dict[str, t.Any]]: """ return [record.data(*keys) async for record in self] + @experimental( + "Result.to_eager_result is experimental. " + "It might be changed or removed any time even without prior notice." + ) + async def to_eager_result(self) -> EagerResult: + """Convert this result to an :class:`.EagerResult`. + + This method exhausts the result and triggers a :meth:`.consume`. + + :returns: all remaining records in the result stream, the result's + summary, and keys as an :class:`.EagerResult` instance. + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. versionadded:: 5.5 + """ + + await self._buffer_all() + return EagerResult( + keys=list(self.keys()), + records=await AsyncUtil.list(self), + summary=await self.consume() + ) + async def to_df( self, expand: bool = False, @@ -730,9 +763,9 @@ async def to_df( df[dt_columns] = df[dt_columns].apply( lambda col: col.map( lambda x: - pd.Timestamp(x.iso_format()) - .replace(tzinfo=getattr(x, "tzinfo", None)) - if x else pd.NaT + pd.Timestamp(x.iso_format()) + .replace(tzinfo=getattr(x, "tzinfo", None)) + if x else pd.NaT ) ) return df diff --git a/src/neo4j/_sync/driver.py b/src/neo4j/_sync/driver.py index 9b64dbef2..9bc994e55 100644 --- a/src/neo4j/_sync/driver.py +++ b/src/neo4j/_sync/driver.py @@ -20,6 +20,7 @@ import asyncio import typing as t +import warnings if t.TYPE_CHECKING: @@ -27,6 +28,7 @@ import ssl +from .._api import RoutingControl from .._async_compat.util import Util from .._conf import ( Config, @@ -40,8 +42,10 @@ deprecation_warn, experimental, experimental_warn, + ExperimentalWarning, unclosed_resource_warn, ) +from .._work import EagerResult from ..addressing import Address from ..api import ( Auth, @@ -69,7 +73,31 @@ TBmConsumer as _TBmConsumer, TBmSupplier as _TBmSupplier, ) -from .work import Session +from .work import ( + ManagedTransaction, + Result, + Session, +) + + +if t.TYPE_CHECKING: + import ssl + from enum import Enum + + import typing_extensions as te + + from .._api import T_RoutingControl + + + class _DefaultEnum(Enum): + default = "default" + + _default = _DefaultEnum.default + +else: + _default = object() + +_T = t.TypeVar("_T") class GraphDatabase: @@ -407,6 +435,12 @@ def __init__(self, pool, default_workspace_config): assert default_workspace_config is not None self._pool = pool self._default_workspace_config = default_workspace_config + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", + message=r".*\bbookmark manager\b.*", + category=ExperimentalWarning) + self._query_bookmark_manager = \ + GraphDatabase.bookmark_manager() def __enter__(self) -> Driver: return self @@ -479,6 +513,314 @@ def close(self) -> None: raise self._closed = True + # overloads to work around https://github.com/python/mypy/issues/3737 + @t.overload + def execute_query( + self, + query_: str, + parameters_: t.Optional[t.Dict[str, t.Any]] = None, + routing_: T_RoutingControl = RoutingControl.WRITERS, + database_: t.Optional[str] = None, + impersonated_user_: t.Optional[str] = None, + bookmark_manager_: t.Union[ + BookmarkManager, BookmarkManager, None + ] = ..., + result_transformer_: t.Callable[ + [Result], t.Union[EagerResult] + ] = ..., + **kwargs: t.Any + ) -> EagerResult: + ... + + @t.overload + def execute_query( + self, + query_: str, + parameters_: t.Optional[t.Dict[str, t.Any]] = None, + routing_: T_RoutingControl = RoutingControl.WRITERS, + database_: t.Optional[str] = None, + impersonated_user_: t.Optional[str] = None, + bookmark_manager_: t.Union[ + BookmarkManager, BookmarkManager, None + ] = ..., + result_transformer_: t.Callable[ + [Result], t.Union[_T] + ] = ..., + **kwargs: t.Any + ) -> _T: + ... + + @experimental( + "Driver.execute_query is experimental. " + "It might be changed or removed any time even without prior notice." + ) + def execute_query( + self, + query_: str, + parameters_: t.Optional[t.Dict[str, t.Any]] = None, + routing_: T_RoutingControl = RoutingControl.WRITERS, + database_: t.Optional[str] = None, + impersonated_user_: t.Optional[str] = None, + bookmark_manager_: t.Union[ + BookmarkManager, BookmarkManager, None, + te.Literal[_DefaultEnum.default] + ] = _default, + result_transformer_: t.Callable[ + [Result], t.Union[t.Any] + ] = Result.to_eager_result, + **kwargs: t.Any + ) -> t.Any: + """Execute a query in a transaction function and return all results. + + This method is a handy wrapper for lower-level driver APIs like + sessions, transactions, and transaction functions. It is intended + for simple use cases where there is no need for managing all possible + options. + + The internal usage of transaction functions provides a retry-mechanism + for appropriate errors. Furthermore, this means that queries using + ``CALL {} IN TRANSACTIONS`` or the older ``USING PERIODIC COMMIT`` + will not work (use :meth:`Session.run` for these). + + The method is roughly equivalent to:: + + def execute_query( + query_, parameters_, routing_, database_, impersonated_user_, + bookmark_manager_, result_transformer_, **kwargs + ): + def work(tx): + result = tx.run(query_, parameters_, **kwargs) + return result_transformer_(result) + + with driver.session( + database=database_, + impersonated_user=impersonated_user_, + bookmark_manager=bookmark_manager_, + ) as session: + if routing_ == RoutingControl.WRITERS: + return session.execute_write(work) + elif routing_ == RoutingControl.READERS: + return session.execute_read(work) + + Usage example:: + + from typing import List + + import neo4j + + + def example(driver: neo4j.Driver) -> List[str]: + \"""Get the name of all 42 year-olds.\""" + records, summary, keys = driver.execute_query( + "MATCH (p:Person {age: $age}) RETURN p.name", + {"age": 42}, + routing_=neo4j.RoutingControl.READERS, # or just "r" + database_="neo4j", + ) + assert keys == ["p.name"] # not needed, just for illustration + # log_summary(summary) # log some metadata + return [str(record["p.name"]) for record in records] + # or: return [str(record[0]) for record in records] + # or even: return list(map(lambda r: str(r[0]), records)) + + Another example:: + + import neo4j + + + def example(driver: neo4j.Driver) -> int: + \"""Call all young people "My dear" and get their count.\""" + record = driver.execute_query( + "MATCH (p:Person) WHERE p.age <= 15 " + "SET p.nickname = 'My dear' " + "RETURN count(*)", + # optional routing parameter, as write is default + # routing_=neo4j.RoutingControl.WRITERS, # or just "w", + database_="neo4j", + result_transformer_=neo4j.Result.single, + ) + assert record is not None # for typechecking and illustration + count = record[0] + assert isinstance(count, int) + return count + + :param query_: cypher query to execute + :type query_: typing.Optional[str] + :param parameters_: parameters to use in the query + :type parameters_: typing.Optional[typing.Dict[str, typing.Any]] + :param routing_: + whether to route the query to a reader (follower/read replica) or + a writer (leader) in the cluster. Default is to route to a writer. + :type routing_: neo4j.RoutingControl + :param database_: + database to execute the query against. + + None (default) uses the database configured on the server side. + + .. Note:: + It is recommended to always specify the database explicitly + when possible. This allows the driver to work more efficiently, + as it will not have to resolve the default database first. + + See also the Session config :ref:`database-ref`. + :type database_: typing.Optional[str] + :param impersonated_user_: + Name of the user to impersonate. + + This means that all query will be executed in the security context + of the impersonated user. For this, the user for which the + :class:`Driver` has been created needs to have the appropriate + permissions. + + See also the Session config :ref:`impersonated-user-ref`. + :type impersonated_user_: typing.Optional[str] + :param result_transformer_: + A function that gets passed the :class:`neo4j.Result` object + resulting from the query and converts it to a different type. The + result of the transformer function is returned by this method. + + .. warning:: + + The transformer function must **not** return the + :class:`neo4j.Result` itself. + + Example transformer that checks that exactly one record is in the + result stream, then returns the record and the result summary:: + + from typing import Tuple + + import neo4j + + + def transformer( + result: neo4j.Result + ) -> Tuple[neo4j.Record, neo4j.ResultSummary]: + record = result.single(strict=True) + summary = result.consume() + return record, summary + + Note that methods of :class:`neo4j.Result` that don't take + mandatory arguments can be used directly as transformer functions. + For example:: + + import neo4j + + + def example(driver: neo4j.Driver) -> neo4j.Record:: + record = driver.execute_query( + "SOME QUERY", + result_transformer_=neo4j.Result.single + ) + + + # is equivalent to: + + + def transformer(result: neo4j.Result) -> neo4j.Record: + return result.single() + + + def example(driver: neo4j.Driver) -> neo4j.Record:: + record = driver.execute_query( + "SOME QUERY", + result_transformer_=transformer + ) + + :type result_transformer_: + typing.Callable[[neo4j.Result], typing.Union[T]] + :param bookmark_manager_: + Specify a bookmark manager to use. + + If present, the bookmark manager is used to keep the query causally + consistent with all work executed using the same bookmark manager. + + Defaults to the driver's :attr:`.query_bookmark_manager`. + + Pass :const:`None` to disable causal consistency. + :type bookmark_manager_: + typing.Union[neo4j.BookmarkManager, neo4j.BookmarkManager, + None] + :param kwargs: additional keyword parameters. None of these can end + with a single underscore. This is to avoid collisions with the + keyword configuration parameters of this method. If you need to + pass such a parameter, use the ``parameters_`` parameter instead. + These take precedence over parameters passed as ``parameters_``. + :type kwargs: typing.Any + + :returns: the result of the ``result_transformer`` + :rtype: T + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. versionadded:: 5.5 + """ + invalid_kwargs = [k for k in kwargs if + k[-2:-1] != "_" and k[-1:] == "_"] + if invalid_kwargs: + raise ValueError( + "keyword parameters must not end with a single '_'. Found: %r" + "\nYou either misspelled an existing configuration parameter " + "or tried to send a query parameter that is reserved. In the " + "latter case, use the `parameters_` dictionary instead." + % invalid_kwargs + ) + parameters = dict(parameters_ or {}, **kwargs) + + if bookmark_manager_ is _default: + bookmark_manager_ = self._query_bookmark_manager + assert bookmark_manager_ is not _default + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", + message=r".*\bbookmark_manager\b.*", + category=ExperimentalWarning) + session = self.session(database=database_, + impersonated_user=impersonated_user_, + bookmark_manager=bookmark_manager_) + with session: + if routing_ == RoutingControl.WRITERS: + executor = session.execute_write + elif routing_ == RoutingControl.READERS: + executor = session.execute_read + else: + raise ValueError("Invalid routing control value: %r" + % routing_) + return executor( + _work, query_, parameters, result_transformer_ + ) + + @property + @experimental( + "Driver.query_bookmark_manager is experimental. " + "It might be changed or removed any time even without prior notice." + ) + def query_bookmark_manager(self) -> BookmarkManager: + """The driver's default query bookmark manager. + + This is the default :class:`BookmarkManager` used by + :meth:`.execute_query`. This can be used to causally chain + :meth:`.execute_query` calls and sessions. Example:: + + def example(driver: neo4j.Driver) -> None: + driver.execute_query("") + with driver.session( + bookmark_manager=driver.query_bookmark_manager + ) as session: + # every query inside this session will be causally chained + # (i.e., can read what was written by ) + session.run("") + # subsequent execute_query calls will be causally chained + # (i.e., can read what was written by ) + driver.execute_query("") + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. versionadded:: 5.5 + """ + return self._query_bookmark_manager + if t.TYPE_CHECKING: def verify_connectivity( @@ -624,6 +966,22 @@ def supports_multi_db(self) -> bool: return session._connection.supports_multiple_databases +def _work( + tx: ManagedTransaction, + query: str, + parameters: t.Dict[str, t.Any], + transformer: t.Callable[[Result], t.Union[_T]] +) -> _T: + res = tx.run(query, parameters) + if transformer is Result.to_eager_result: + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", + message=r".*\bto_eager_result\b.*", + category=ExperimentalWarning) + return transformer(res) + return transformer(res) + + class BoltDriver(_Direct, Driver): """:class:`.BoltDriver` is instantiated for ``bolt`` URIs and addresses a single database machine. This may be a standalone server or diff --git a/src/neo4j/_sync/work/__init__.py b/src/neo4j/_sync/work/__init__.py index 92a1af0d4..e5f1ddb1c 100644 --- a/src/neo4j/_sync/work/__init__.py +++ b/src/neo4j/_sync/work/__init__.py @@ -15,9 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from .session import ( +from .result import ( + EagerResult, Result, +) +from .session import ( Session, Workspace, ) @@ -35,4 +37,5 @@ "Transaction", "TransactionBase", "Workspace", + "EagerResult", ] diff --git a/src/neo4j/_sync/work/result.py b/src/neo4j/_sync/work/result.py index c881a19f2..a8a2a2f84 100644 --- a/src/neo4j/_sync/work/result.py +++ b/src/neo4j/_sync/work/result.py @@ -19,7 +19,10 @@ from __future__ import annotations import typing as t -from collections import deque +from collections import ( + deque, + namedtuple, +) from warnings import warn @@ -35,6 +38,7 @@ RecordTableRowExporter, ) from ..._meta import experimental +from ..._work import EagerResult from ...exceptions import ( ResultConsumedError, ResultNotSingleError, @@ -600,6 +604,35 @@ def data(self, *keys: _TResultKey) -> t.List[t.Dict[str, t.Any]]: """ return [record.data(*keys) for record in self] + @experimental( + "Result.to_eager_result is experimental. " + "It might be changed or removed any time even without prior notice." + ) + def to_eager_result(self) -> EagerResult: + """Convert this result to an :class:`.EagerResult`. + + This method exhausts the result and triggers a :meth:`.consume`. + + :returns: all remaining records in the result stream, the result's + summary, and keys as an :class:`.EagerResult` instance. + + :raises ResultConsumedError: if the transaction from which this result + was obtained has been closed or the Result has been explicitly + consumed. + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. versionadded:: 5.5 + """ + + self._buffer_all() + return EagerResult( + keys=list(self.keys()), + records=Util.list(self), + summary=self.consume() + ) + def to_df( self, expand: bool = False, @@ -730,9 +763,9 @@ def to_df( df[dt_columns] = df[dt_columns].apply( lambda col: col.map( lambda x: - pd.Timestamp(x.iso_format()) - .replace(tzinfo=getattr(x, "tzinfo", None)) - if x else pd.NaT + pd.Timestamp(x.iso_format()) + .replace(tzinfo=getattr(x, "tzinfo", None)) + if x else pd.NaT ) ) return df diff --git a/src/neo4j/_work/__init__.py b/src/neo4j/_work/__init__.py new file mode 100644 index 000000000..15d85d129 --- /dev/null +++ b/src/neo4j/_work/__init__.py @@ -0,0 +1,24 @@ +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [https://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ._eager_result import EagerResult + + +__all__ = [ + "EagerResult", +] diff --git a/src/neo4j/_work/_eager_result.py b/src/neo4j/_work/_eager_result.py new file mode 100644 index 000000000..525b064f6 --- /dev/null +++ b/src/neo4j/_work/_eager_result.py @@ -0,0 +1,53 @@ +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [https://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import typing as t + +from .._data import Record +from ..work import ResultSummary + + +class EagerResult(t.NamedTuple): + """In-memory result of a query. + + It's a named tuple with 3 elements: + * records - the list of records returned by the query + (list of :class:`.Record` objects) + * summary - the summary of the query execution + (:class:`.ResultSummary` object) + * keys - the list of keys returned by the query + (see :attr:`AsyncResult.keys` and :attr:`.Result.keys`) + + **This is experimental.** (See :ref:`filter-warnings-ref`) + It might be changed or removed any time even without prior notice. + + .. seealso:: + :attr:`.AsyncDriver.execute_query`, :attr:`.Driver.execute_query` + Which by default return an instance of this class. + + :attr:`.AsyncResult.to_eager_result`, :attr:`.Result.to_eager_result` + Which can be used to convert to instance of this class. + + .. versionadded:: 5.5 + """ + #: Alias for field 0 (``eager_result[0]``) + records: t.List[Record] + #: Alias for field 1 (``eager_result[1]``) + summary: ResultSummary + #: Alias for field 2 (``eager_result[2]``) + keys: t.List[str] diff --git a/testkitbackend/_async/requests.py b/testkitbackend/_async/requests.py index c3b282165..7b0f00dbe 100644 --- a/testkitbackend/_async/requests.py +++ b/testkitbackend/_async/requests.py @@ -149,7 +149,7 @@ async def NewDriver(backend, data): data.mark_item_as_read_if_equals("livenessCheckTimeoutMs", None) driver = neo4j.AsyncGraphDatabase.driver( - data["uri"], auth=auth, user_agent=data["userAgent"], **kwargs + data["uri"], auth=auth, user_agent=data["userAgent"], **kwargs, ) key = backend.next_key() backend.drivers[key] = driver @@ -188,6 +188,40 @@ async def CheckMultiDBSupport(backend, data): }) +async def ExecuteQuery(backend, data): + driver = backend.drivers[data["driverId"]] + cypher, params = fromtestkit.to_cypher_and_params(data) + config = data.get("config", {}) + kwargs = {} + for config_key, kwargs_key in ( + ("database", "database_"), + ("routing", "routing_"), + ("impersonatedUser", "impersonated_user_"), + ): + value = config.get(config_key, None) + if value is not None: + kwargs[kwargs_key] = value + bookmark_manager_id = config.get("bookmarkManagerId") + if bookmark_manager_id is not None: + if bookmark_manager_id == -1: + kwargs["bookmark_manager_"] = None + else: + bookmark_manager = backend.bookmark_managers[bookmark_manager_id] + kwargs["bookmark_manager_"] = bookmark_manager + + with warning_check( + neo4j.ExperimentalWarning, + "Driver.execute_query is experimental. " + "It might be changed or removed any time even without prior notice." + ): + eager_result = await driver.execute_query(cypher, params, **kwargs) + await backend.send_response("EagerResult", { + "keys": eager_result.keys, + "records": list(map(totestkit.record, eager_result.records)), + "summary": totestkit.summary(eager_result.summary), + }) + + def resolution_func(backend, custom_resolver=False, custom_dns_resolver=False): # This solution (putting custom resolution together with DNS resolution # into one function only works because the Python driver calls the custom @@ -584,42 +618,7 @@ async def ResultConsume(backend, data): summary = await result.consume() from neo4j import ResultSummary assert isinstance(summary, ResultSummary) - await backend.send_response("Summary", { - "serverInfo": { - "address": ":".join(map(str, summary.server.address)), - "agent": summary.server.agent, - "protocolVersion": - ".".join(map(str, summary.server.protocol_version)), - }, - "counters": None if not summary.counters else { - "constraintsAdded": summary.counters.constraints_added, - "constraintsRemoved": summary.counters.constraints_removed, - "containsSystemUpdates": summary.counters.contains_system_updates, - "containsUpdates": summary.counters.contains_updates, - "indexesAdded": summary.counters.indexes_added, - "indexesRemoved": summary.counters.indexes_removed, - "labelsAdded": summary.counters.labels_added, - "labelsRemoved": summary.counters.labels_removed, - "nodesCreated": summary.counters.nodes_created, - "nodesDeleted": summary.counters.nodes_deleted, - "propertiesSet": summary.counters.properties_set, - "relationshipsCreated": summary.counters.relationships_created, - "relationshipsDeleted": summary.counters.relationships_deleted, - "systemUpdates": summary.counters.system_updates, - }, - "database": summary.database, - "notifications": summary.notifications, - "plan": summary.plan, - "profile": summary.profile, - "query": { - "text": summary.query, - "parameters": {k: totestkit.field(v) - for k, v in summary.parameters.items()}, - }, - "queryType": summary.query_type, - "resultAvailableAfter": summary.result_available_after, - "resultConsumedAfter": summary.result_consumed_after, - }) + await backend.send_response("Summary", totestkit.summary(summary)) async def ForcedRoutingTableUpdate(backend, data): diff --git a/testkitbackend/_sync/requests.py b/testkitbackend/_sync/requests.py index ab6ba7929..854bd9042 100644 --- a/testkitbackend/_sync/requests.py +++ b/testkitbackend/_sync/requests.py @@ -149,7 +149,7 @@ def NewDriver(backend, data): data.mark_item_as_read_if_equals("livenessCheckTimeoutMs", None) driver = neo4j.GraphDatabase.driver( - data["uri"], auth=auth, user_agent=data["userAgent"], **kwargs + data["uri"], auth=auth, user_agent=data["userAgent"], **kwargs, ) key = backend.next_key() backend.drivers[key] = driver @@ -188,6 +188,40 @@ def CheckMultiDBSupport(backend, data): }) +def ExecuteQuery(backend, data): + driver = backend.drivers[data["driverId"]] + cypher, params = fromtestkit.to_cypher_and_params(data) + config = data.get("config", {}) + kwargs = {} + for config_key, kwargs_key in ( + ("database", "database_"), + ("routing", "routing_"), + ("impersonatedUser", "impersonated_user_"), + ): + value = config.get(config_key, None) + if value is not None: + kwargs[kwargs_key] = value + bookmark_manager_id = config.get("bookmarkManagerId") + if bookmark_manager_id is not None: + if bookmark_manager_id == -1: + kwargs["bookmark_manager_"] = None + else: + bookmark_manager = backend.bookmark_managers[bookmark_manager_id] + kwargs["bookmark_manager_"] = bookmark_manager + + with warning_check( + neo4j.ExperimentalWarning, + "Driver.execute_query is experimental. " + "It might be changed or removed any time even without prior notice." + ): + eager_result = driver.execute_query(cypher, params, **kwargs) + backend.send_response("EagerResult", { + "keys": eager_result.keys, + "records": list(map(totestkit.record, eager_result.records)), + "summary": totestkit.summary(eager_result.summary), + }) + + def resolution_func(backend, custom_resolver=False, custom_dns_resolver=False): # This solution (putting custom resolution together with DNS resolution # into one function only works because the Python driver calls the custom @@ -584,42 +618,7 @@ def ResultConsume(backend, data): summary = result.consume() from neo4j import ResultSummary assert isinstance(summary, ResultSummary) - backend.send_response("Summary", { - "serverInfo": { - "address": ":".join(map(str, summary.server.address)), - "agent": summary.server.agent, - "protocolVersion": - ".".join(map(str, summary.server.protocol_version)), - }, - "counters": None if not summary.counters else { - "constraintsAdded": summary.counters.constraints_added, - "constraintsRemoved": summary.counters.constraints_removed, - "containsSystemUpdates": summary.counters.contains_system_updates, - "containsUpdates": summary.counters.contains_updates, - "indexesAdded": summary.counters.indexes_added, - "indexesRemoved": summary.counters.indexes_removed, - "labelsAdded": summary.counters.labels_added, - "labelsRemoved": summary.counters.labels_removed, - "nodesCreated": summary.counters.nodes_created, - "nodesDeleted": summary.counters.nodes_deleted, - "propertiesSet": summary.counters.properties_set, - "relationshipsCreated": summary.counters.relationships_created, - "relationshipsDeleted": summary.counters.relationships_deleted, - "systemUpdates": summary.counters.system_updates, - }, - "database": summary.database, - "notifications": summary.notifications, - "plan": summary.plan, - "profile": summary.profile, - "query": { - "text": summary.query, - "parameters": {k: totestkit.field(v) - for k, v in summary.parameters.items()}, - }, - "queryType": summary.query_type, - "resultAvailableAfter": summary.result_available_after, - "resultConsumedAfter": summary.result_consumed_after, - }) + backend.send_response("Summary", totestkit.summary(summary)) def ForcedRoutingTableUpdate(backend, data): diff --git a/testkitbackend/fromtestkit.py b/testkitbackend/fromtestkit.py index e28a63079..0c5405907 100644 --- a/testkitbackend/fromtestkit.py +++ b/testkitbackend/fromtestkit.py @@ -35,7 +35,7 @@ def to_cypher_and_params(data): from .backend import Request - params = data["params"] + params = data.get("params") # Optional if params is None: return data["cypher"], None diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index 18710693f..d8d8bc3e7 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -18,6 +18,7 @@ "features": { "Feature:API:BookmarkManager": true, "Feature:API:ConnectionAcquisitionTimeout": true, + "Feature:API:Driver.ExecuteQuery": true, "Feature:API:Driver:GetServerInfo": true, "Feature:API:Driver.IsEncrypted": true, "Feature:API:Driver.VerifyConnectivity": true, diff --git a/testkitbackend/totestkit.py b/testkitbackend/totestkit.py index 180f5b4e8..fadc8b5fe 100644 --- a/testkitbackend/totestkit.py +++ b/testkitbackend/totestkit.py @@ -18,6 +18,7 @@ import math +import neo4j from neo4j.graph import ( Node, Path, @@ -44,6 +45,45 @@ def record(rec): return {"values": fields} +def summary(summary_: neo4j.ResultSummary) -> dict: + return { + "serverInfo": { + "address": ":".join(map(str, summary_.server.address)), + "agent": summary_.server.agent, + "protocolVersion": + ".".join(map(str, summary_.server.protocol_version)), + }, + "counters": None if not summary_.counters else { + "constraintsAdded": summary_.counters.constraints_added, + "constraintsRemoved": summary_.counters.constraints_removed, + "containsSystemUpdates": summary_.counters.contains_system_updates, + "containsUpdates": summary_.counters.contains_updates, + "indexesAdded": summary_.counters.indexes_added, + "indexesRemoved": summary_.counters.indexes_removed, + "labelsAdded": summary_.counters.labels_added, + "labelsRemoved": summary_.counters.labels_removed, + "nodesCreated": summary_.counters.nodes_created, + "nodesDeleted": summary_.counters.nodes_deleted, + "propertiesSet": summary_.counters.properties_set, + "relationshipsCreated": summary_.counters.relationships_created, + "relationshipsDeleted": summary_.counters.relationships_deleted, + "systemUpdates": summary_.counters.system_updates, + }, + "database": summary_.database, + "notifications": summary_.notifications, + "plan": summary_.plan, + "profile": summary_.profile, + "query": { + "text": summary_.query, + "parameters": {k: field(v) + for k, v in (summary_.parameters or {}).items()}, + }, + "queryType": summary_.query_type, + "resultAvailableAfter": summary_.result_available_after, + "resultConsumedAfter": summary_.result_consumed_after, + } + + def field(v): def to(name, val): return {"name": name, "data": {"value": val}} diff --git a/tests/unit/async_/test_driver.py b/tests/unit/async_/test_driver.py index f9007a405..1a82a0435 100644 --- a/tests/unit/async_/test_driver.py +++ b/tests/unit/async_/test_driver.py @@ -20,13 +20,17 @@ import ssl import typing as t +from contextlib import contextmanager import pytest +import typing_extensions as te +import neo4j from neo4j import ( AsyncBoltDriver, AsyncGraphDatabase, AsyncNeo4jDriver, + AsyncResult, ExperimentalWarning, TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, @@ -34,6 +38,7 @@ TrustCustomCAs, TrustSystemCAs, ) +from neo4j._async.driver import _work from neo4j.api import ( AsyncBookmarkManager, BookmarkManager, @@ -48,6 +53,25 @@ ) +@contextmanager +def assert_warns_execute_query_experimental(): + with pytest.warns( + ExperimentalWarning, + match=r"^Driver\.execute_query is experimental\." + ): + yield + + +@contextmanager +def assert_warns_execute_query_bmm_experimental(): + with pytest.warns( + ExperimentalWarning, + match=r"^Driver\.query_bookmark_manager is experimental\." + ): + yield + + + @pytest.mark.parametrize("protocol", ("bolt://", "bolt+s://", "bolt+ssc://")) @pytest.mark.parametrize("host", ("localhost", "127.0.0.1", "[::1]", "[0:0:0:0:0:0:0:1]")) @@ -404,3 +428,361 @@ def forget(self, databases: t.Iterable[str]) -> None: _ = driver.session(bookmark_manager=bmm) session_cls_mock.assert_called_once() assert session_cls_mock.call_args[0][1].bookmark_manager is bmm + + +class SomeClass: + pass + + +@mark_async_test +async def test_execute_query_work(mocker) -> None: + tx_mock = mocker.AsyncMock(spec=neo4j.AsyncManagedTransaction) + transformer_mock = mocker.AsyncMock() + transformer: t.Callable[[AsyncResult], t.Awaitable[SomeClass]] = \ + transformer_mock + query = "QUERY" + parameters = {"para": "meters", "foo": object} + + res: SomeClass = await _work(tx_mock, query, parameters, transformer) + + tx_mock.run.assert_awaited_once_with(query, parameters) + transformer_mock.assert_awaited_once_with(tx_mock.run.return_value) + assert res is transformer_mock.return_value + + +@pytest.mark.parametrize("query", ("foo", "bar", "RETURN 1 AS n")) +@pytest.mark.parametrize("positional", (True, False)) +@mark_async_test +async def test_execute_query_query( + mocker, query: str, positional: bool +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + async with driver as driver: + with assert_warns_execute_query_experimental(): + if positional: + res = await driver.execute_query(query) + else: + res = await driver.execute_query(query_=query) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__aenter__.assert_awaited_once() + session_mock.__aexit__.assert_awaited_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_awaited_once_with( + _work, query, mocker.ANY, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize("parameters", ( + ..., None, {}, {"foo": 1}, {"foo": 1, "bar": object()} +)) +@pytest.mark.parametrize("positional", (True, False)) +@mark_async_test +async def test_execute_query_parameters( + mocker, parameters: t.Optional[t.Dict[str, t.Any]], + positional: bool +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + async with driver as driver: + with assert_warns_execute_query_experimental(): + if parameters is Ellipsis: + parameters = None + res = await driver.execute_query("") + else: + if positional: + res = await driver.execute_query("", parameters) + else: + res = await driver.execute_query("", + parameters_=parameters) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__aenter__.assert_awaited_once() + session_mock.__aexit__.assert_awaited_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_awaited_once_with( + _work, mocker.ANY, parameters or {}, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize("parameters", ( + None, {}, {"foo": 1}, {"foo": 1, "_bar": object()}, {"__": 1}, {"baz__": 2} +)) +@mark_async_test +async def test_execute_query_keyword_parameters( + mocker, parameters: t.Optional[t.Dict[str, t.Any]], +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + async with driver as driver: + with assert_warns_execute_query_experimental(): + if parameters is None: + res = await driver.execute_query("") + else: + res = await driver.execute_query("", **parameters) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__aenter__.assert_awaited_once() + session_mock.__aexit__.assert_awaited_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_awaited_once_with( + _work, mocker.ANY, parameters or {}, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize("parameters", ( + {"_": "a"}, {"foo_": None}, {"foo_": 1, "bar_": 2} +)) +async def test_reserved_query_keyword_parameters( + mocker, parameters: t.Dict[str, t.Any], +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + mocker.patch("neo4j._async.driver.AsyncSession", autospec=True) + async with driver as driver: + with pytest.raises(ValueError) as exc: + with assert_warns_execute_query_experimental(): + await driver.execute_query("", **parameters) + exc.match("reserved") + exc.match(", ".join(f"'{k}'" for k in parameters)) + + +@pytest.mark.parametrize( + ("params", "kw_params", "expected_params"), + ( + ({"x": 1}, {}, {"x": 1}), + ({}, {"x": 1}, {"x": 1}), + (None, {"x": 1}, {"x": 1}), + ({"x": 1}, {"y": 2}, {"x": 1, "y": 2}), + ({"x": 1}, {"x": 2}, {"x": 2}), + ({"x": 1}, {"x": 2}, {"x": 2}), + ({"x": 1, "y": 3}, {"x": 2}, {"x": 2, "y": 3}), + ({"x": 1}, {"x": 2, "y": 3}, {"x": 2, "y": 3}), + # potentially internally used keyword arguments + ({}, {"timeout": 2}, {"timeout": 2}), + ({"timeout": 2}, {}, {"timeout": 2}), + ({}, {"imp_user": "hans"}, {"imp_user": "hans"}), + ({"imp_user": "hans"}, {}, {"imp_user": "hans"}), + ({}, {"db": "neo4j"}, {"db": "neo4j"}), + ({"db": "neo4j"}, {}, {"db": "neo4j"}), + ({"_": "foobar"}, {}, {"_": "foobar"}), + ({"__": "foobar"}, {}, {"__": "foobar"}), + ({"x_": "foobar"}, {}, {"x_": "foobar"}), + ({"x__": "foobar"}, {}, {"x__": "foobar"}), + ({}, {"database": "neo4j"}, {"database": "neo4j"}), + ({"database": "neo4j"}, {}, {"database": "neo4j"}), + # already taken keyword arguments + ({}, {"database_": "neo4j"}, {}), + ({"database_": "neo4j"}, {}, {"database_": "neo4j"}), + ) +) +@pytest.mark.parametrize("positional", (True, False)) +@mark_async_test +async def test_execute_query_parameter_precedence( + params: t.Optional[t.Dict[str, t.Any]], + kw_params: t.Dict[str, t.Any], + expected_params: t.Dict[str, t.Any], + positional: bool, + mocker +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + async with driver as driver: + with assert_warns_execute_query_experimental(): + if params is None: + res = await driver.execute_query("", **kw_params) + else: + if positional: + res = await driver.execute_query("", params, **kw_params) + else: + res = await driver.execute_query("", parameters_=params, + **kw_params) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__aenter__.assert_awaited_once() + session_mock.__aexit__.assert_awaited_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_awaited_once_with( + _work, mocker.ANY, expected_params, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize( + ("routing_mode", "session_executor"), + ( + (None, "execute_write"), + ("r", "execute_read"), + ("w", "execute_write"), + (neo4j.RoutingControl.READERS, "execute_read"), + (neo4j.RoutingControl.WRITERS, "execute_write"), + ) +) +@pytest.mark.parametrize("positional", (True, False)) +@mark_async_test +async def test_execute_query_routing_control( + mocker, session_executor: str, positional: bool, + routing_mode: t.Union[neo4j.RoutingControl, te.Literal["r", "w"], None] +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + async with driver as driver: + with assert_warns_execute_query_experimental(): + if routing_mode is None: + res = await driver.execute_query("") + else: + if positional: + res = await driver.execute_query("", None, routing_mode) + else: + res = await driver.execute_query("", routing_=routing_mode) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__aenter__.assert_awaited_once() + session_mock.__aexit__.assert_awaited_once() + session_executor_mock = getattr(session_mock, session_executor) + session_executor_mock.assert_awaited_once_with( + _work, mocker.ANY, mocker.ANY, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize("database", ( + ..., None, "foo", "baz", "neo4j", "system" +)) +@pytest.mark.parametrize("positional", (True, False)) +@mark_async_test +async def test_execute_query_database( + mocker, database: t.Optional[str], positional: bool +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + async with driver as driver: + with assert_warns_execute_query_experimental(): + if database is Ellipsis: + database = None + await driver.execute_query("") + else: + if positional: + await driver.execute_query("", None, "w", database) + else: + await driver.execute_query("", database_=database) + + session_cls_mock.assert_called_once() + session_config = session_cls_mock.call_args.args[1] + assert session_config.database == database + + +@pytest.mark.parametrize("impersonated_user", (..., None, "foo", "baz")) +@pytest.mark.parametrize("positional", (True, False)) +@mark_async_test +async def test_execute_query_impersonated_user( + mocker, impersonated_user: t.Optional[str], positional: bool +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + async with driver as driver: + with assert_warns_execute_query_experimental(): + if impersonated_user is Ellipsis: + impersonated_user = None + await driver.execute_query("") + else: + if positional: + await driver.execute_query( + "", None, "w", None, impersonated_user + ) + else: + await driver.execute_query( + "", impersonated_user_=impersonated_user + ) + + session_cls_mock.assert_called_once() + session_config = session_cls_mock.call_args.args[1] + assert session_config.impersonated_user == impersonated_user + + +@pytest.mark.parametrize("bookmark_manager", (..., None, object())) +@pytest.mark.parametrize("positional", (True, False)) +@mark_async_test +async def test_execute_query_bookmark_manager( + mocker, positional: bool, + bookmark_manager: t.Union[AsyncBookmarkManager, BookmarkManager, None] +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + async with driver as driver: + with assert_warns_execute_query_experimental(): + if bookmark_manager is Ellipsis: + with assert_warns_execute_query_bmm_experimental(): + bookmark_manager = driver.query_bookmark_manager + await driver.execute_query("") + else: + if positional: + await driver.execute_query( + "", None, "w", None, None, bookmark_manager + ) + else: + await driver.execute_query( + "", bookmark_manager_=bookmark_manager + ) + + session_cls_mock.assert_called_once() + session_config = session_cls_mock.call_args.args[1] + assert session_config.bookmark_manager == bookmark_manager + + +@pytest.mark.parametrize("result_transformer", (..., object())) +@pytest.mark.parametrize("positional", (True, False)) +@mark_async_test +async def test_execute_query_result_transformer( + mocker, positional: bool, + result_transformer: t.Callable[[AsyncResult], t.Awaitable[SomeClass]] +) -> None: + driver = AsyncGraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._async.driver.AsyncSession", + autospec=True) + res: t.Any + async with driver as driver: + with assert_warns_execute_query_experimental(): + if result_transformer is Ellipsis: + result_transformer = AsyncResult.to_eager_result + res_default: neo4j.EagerResult = await driver.execute_query("") + res = res_default + else: + res_custom: SomeClass + if positional: + with assert_warns_execute_query_bmm_experimental(): + bmm = driver.query_bookmark_manager + res_custom = await driver.execute_query( + "", None, "w", None, None, bmm, result_transformer + ) + else: + res_custom = await driver.execute_query( + "", result_transformer_=result_transformer + ) + res = res_custom + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__aenter__.assert_awaited_once() + session_mock.__aexit__.assert_awaited_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_awaited_once_with( + _work, mocker.ANY, mocker.ANY, result_transformer + ) + assert res is session_executor_mock.return_value diff --git a/tests/unit/async_/work/test_result.py b/tests/unit/async_/work/test_result.py index 4c46e3c12..40b138414 100644 --- a/tests/unit/async_/work/test_result.py +++ b/tests/unit/async_/work/test_result.py @@ -16,7 +16,9 @@ # limitations under the License. +import uuid import warnings +from contextlib import contextmanager from unittest import mock import pandas as pd @@ -26,6 +28,7 @@ from neo4j import ( Address, AsyncResult, + EagerResult, ExperimentalWarning, Record, ResultSummary, @@ -53,6 +56,15 @@ from ...._async_compat import mark_async_test +@contextmanager +def assert_warns_to_eager_result_experimental(): + with pytest.warns( + ExperimentalWarning, + match=r"^Result\.to_eager_result is experimental\." + ): + yield + + class Records: def __init__(self, fields, records): self.fields = tuple(fields) @@ -687,6 +699,50 @@ async def test_result_graph(records): assert rel["since"] == 1999 +@pytest.mark.parametrize("records", ( + Records(["n"], []), + Records(["n"], [[42], [69], [420], [1337]]), + Records(["n1", "r", "n2"], [ + [ + # Node + Structure(b"N", 0, ["Person", "LabelTest1"], {"name": "Alice"}), + # Relationship + Structure(b"R", 0, 0, 1, "KNOWS", {"since": 1999}), + # Node + Structure(b"N", 1, ["Person", "LabelTest2"], {"name": "Bob"}), + ] + ]), + Records(["secret_sauce"], [[object()], [object()]]), +)) +@mark_async_test +async def test_to_eager_result(records): + summary = {"test_to_eager_result": uuid.uuid4()} + connection = AsyncConnectionStub(records=records, summary_meta=summary) + result = AsyncResult(connection, 1, noop, noop) + await result._run("CYPHER", {}, None, None, "r", None) + with assert_warns_to_eager_result_experimental(): + eager_result = await result.to_eager_result() + + assert isinstance(eager_result, EagerResult) + + assert eager_result.records is eager_result[0] + assert isinstance(eager_result.records, list) + assert all(isinstance(r, Record) for r in eager_result.records) + assert len(eager_result.records) == len(records) + assert all(list(record) == list(raw) + for record, raw in zip(eager_result.records, records)) + + assert eager_result.summary is eager_result[1] + assert isinstance(eager_result.summary, ResultSummary) + assert (eager_result.summary.metadata.get("test_to_eager_result") + == summary["test_to_eager_result"]) + + assert eager_result.keys is eager_result[2] + assert isinstance(eager_result.keys, list) + assert all(isinstance(k, str) for k in eager_result.keys) + assert eager_result.keys == list(records.fields) + + @pytest.mark.parametrize( ("keys", "values", "types", "instances"), ( diff --git a/tests/unit/sync/test_driver.py b/tests/unit/sync/test_driver.py index 0f785c64d..29c526f62 100644 --- a/tests/unit/sync/test_driver.py +++ b/tests/unit/sync/test_driver.py @@ -20,20 +20,25 @@ import ssl import typing as t +from contextlib import contextmanager import pytest +import typing_extensions as te +import neo4j from neo4j import ( BoltDriver, ExperimentalWarning, GraphDatabase, Neo4jDriver, + Result, TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, TrustAll, TrustCustomCAs, TrustSystemCAs, ) +from neo4j._sync.driver import _work from neo4j.api import ( BookmarkManager, READ_ACCESS, @@ -47,6 +52,25 @@ ) +@contextmanager +def assert_warns_execute_query_experimental(): + with pytest.warns( + ExperimentalWarning, + match=r"^Driver\.execute_query is experimental\." + ): + yield + + +@contextmanager +def assert_warns_execute_query_bmm_experimental(): + with pytest.warns( + ExperimentalWarning, + match=r"^Driver\.query_bookmark_manager is experimental\." + ): + yield + + + @pytest.mark.parametrize("protocol", ("bolt://", "bolt+s://", "bolt+ssc://")) @pytest.mark.parametrize("host", ("localhost", "127.0.0.1", "[::1]", "[0:0:0:0:0:0:0:1]")) @@ -403,3 +427,361 @@ def forget(self, databases: t.Iterable[str]) -> None: _ = driver.session(bookmark_manager=bmm) session_cls_mock.assert_called_once() assert session_cls_mock.call_args[0][1].bookmark_manager is bmm + + +class SomeClass: + pass + + +@mark_sync_test +def test_execute_query_work(mocker) -> None: + tx_mock = mocker.Mock(spec=neo4j.ManagedTransaction) + transformer_mock = mocker.Mock() + transformer: t.Callable[[Result], t.Union[SomeClass]] = \ + transformer_mock + query = "QUERY" + parameters = {"para": "meters", "foo": object} + + res: SomeClass = _work(tx_mock, query, parameters, transformer) + + tx_mock.run.assert_called_once_with(query, parameters) + transformer_mock.assert_called_once_with(tx_mock.run.return_value) + assert res is transformer_mock.return_value + + +@pytest.mark.parametrize("query", ("foo", "bar", "RETURN 1 AS n")) +@pytest.mark.parametrize("positional", (True, False)) +@mark_sync_test +def test_execute_query_query( + mocker, query: str, positional: bool +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + with driver as driver: + with assert_warns_execute_query_experimental(): + if positional: + res = driver.execute_query(query) + else: + res = driver.execute_query(query_=query) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__enter__.assert_called_once() + session_mock.__exit__.assert_called_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_called_once_with( + _work, query, mocker.ANY, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize("parameters", ( + ..., None, {}, {"foo": 1}, {"foo": 1, "bar": object()} +)) +@pytest.mark.parametrize("positional", (True, False)) +@mark_sync_test +def test_execute_query_parameters( + mocker, parameters: t.Optional[t.Dict[str, t.Any]], + positional: bool +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + with driver as driver: + with assert_warns_execute_query_experimental(): + if parameters is Ellipsis: + parameters = None + res = driver.execute_query("") + else: + if positional: + res = driver.execute_query("", parameters) + else: + res = driver.execute_query("", + parameters_=parameters) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__enter__.assert_called_once() + session_mock.__exit__.assert_called_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_called_once_with( + _work, mocker.ANY, parameters or {}, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize("parameters", ( + None, {}, {"foo": 1}, {"foo": 1, "_bar": object()}, {"__": 1}, {"baz__": 2} +)) +@mark_sync_test +def test_execute_query_keyword_parameters( + mocker, parameters: t.Optional[t.Dict[str, t.Any]], +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + with driver as driver: + with assert_warns_execute_query_experimental(): + if parameters is None: + res = driver.execute_query("") + else: + res = driver.execute_query("", **parameters) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__enter__.assert_called_once() + session_mock.__exit__.assert_called_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_called_once_with( + _work, mocker.ANY, parameters or {}, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize("parameters", ( + {"_": "a"}, {"foo_": None}, {"foo_": 1, "bar_": 2} +)) +def test_reserved_query_keyword_parameters( + mocker, parameters: t.Dict[str, t.Any], +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + mocker.patch("neo4j._sync.driver.Session", autospec=True) + with driver as driver: + with pytest.raises(ValueError) as exc: + with assert_warns_execute_query_experimental(): + driver.execute_query("", **parameters) + exc.match("reserved") + exc.match(", ".join(f"'{k}'" for k in parameters)) + + +@pytest.mark.parametrize( + ("params", "kw_params", "expected_params"), + ( + ({"x": 1}, {}, {"x": 1}), + ({}, {"x": 1}, {"x": 1}), + (None, {"x": 1}, {"x": 1}), + ({"x": 1}, {"y": 2}, {"x": 1, "y": 2}), + ({"x": 1}, {"x": 2}, {"x": 2}), + ({"x": 1}, {"x": 2}, {"x": 2}), + ({"x": 1, "y": 3}, {"x": 2}, {"x": 2, "y": 3}), + ({"x": 1}, {"x": 2, "y": 3}, {"x": 2, "y": 3}), + # potentially internally used keyword arguments + ({}, {"timeout": 2}, {"timeout": 2}), + ({"timeout": 2}, {}, {"timeout": 2}), + ({}, {"imp_user": "hans"}, {"imp_user": "hans"}), + ({"imp_user": "hans"}, {}, {"imp_user": "hans"}), + ({}, {"db": "neo4j"}, {"db": "neo4j"}), + ({"db": "neo4j"}, {}, {"db": "neo4j"}), + ({"_": "foobar"}, {}, {"_": "foobar"}), + ({"__": "foobar"}, {}, {"__": "foobar"}), + ({"x_": "foobar"}, {}, {"x_": "foobar"}), + ({"x__": "foobar"}, {}, {"x__": "foobar"}), + ({}, {"database": "neo4j"}, {"database": "neo4j"}), + ({"database": "neo4j"}, {}, {"database": "neo4j"}), + # already taken keyword arguments + ({}, {"database_": "neo4j"}, {}), + ({"database_": "neo4j"}, {}, {"database_": "neo4j"}), + ) +) +@pytest.mark.parametrize("positional", (True, False)) +@mark_sync_test +def test_execute_query_parameter_precedence( + params: t.Optional[t.Dict[str, t.Any]], + kw_params: t.Dict[str, t.Any], + expected_params: t.Dict[str, t.Any], + positional: bool, + mocker +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + with driver as driver: + with assert_warns_execute_query_experimental(): + if params is None: + res = driver.execute_query("", **kw_params) + else: + if positional: + res = driver.execute_query("", params, **kw_params) + else: + res = driver.execute_query("", parameters_=params, + **kw_params) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__enter__.assert_called_once() + session_mock.__exit__.assert_called_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_called_once_with( + _work, mocker.ANY, expected_params, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize( + ("routing_mode", "session_executor"), + ( + (None, "execute_write"), + ("r", "execute_read"), + ("w", "execute_write"), + (neo4j.RoutingControl.READERS, "execute_read"), + (neo4j.RoutingControl.WRITERS, "execute_write"), + ) +) +@pytest.mark.parametrize("positional", (True, False)) +@mark_sync_test +def test_execute_query_routing_control( + mocker, session_executor: str, positional: bool, + routing_mode: t.Union[neo4j.RoutingControl, te.Literal["r", "w"], None] +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + with driver as driver: + with assert_warns_execute_query_experimental(): + if routing_mode is None: + res = driver.execute_query("") + else: + if positional: + res = driver.execute_query("", None, routing_mode) + else: + res = driver.execute_query("", routing_=routing_mode) + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__enter__.assert_called_once() + session_mock.__exit__.assert_called_once() + session_executor_mock = getattr(session_mock, session_executor) + session_executor_mock.assert_called_once_with( + _work, mocker.ANY, mocker.ANY, mocker.ANY + ) + assert res is session_executor_mock.return_value + + +@pytest.mark.parametrize("database", ( + ..., None, "foo", "baz", "neo4j", "system" +)) +@pytest.mark.parametrize("positional", (True, False)) +@mark_sync_test +def test_execute_query_database( + mocker, database: t.Optional[str], positional: bool +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + with driver as driver: + with assert_warns_execute_query_experimental(): + if database is Ellipsis: + database = None + driver.execute_query("") + else: + if positional: + driver.execute_query("", None, "w", database) + else: + driver.execute_query("", database_=database) + + session_cls_mock.assert_called_once() + session_config = session_cls_mock.call_args.args[1] + assert session_config.database == database + + +@pytest.mark.parametrize("impersonated_user", (..., None, "foo", "baz")) +@pytest.mark.parametrize("positional", (True, False)) +@mark_sync_test +def test_execute_query_impersonated_user( + mocker, impersonated_user: t.Optional[str], positional: bool +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + with driver as driver: + with assert_warns_execute_query_experimental(): + if impersonated_user is Ellipsis: + impersonated_user = None + driver.execute_query("") + else: + if positional: + driver.execute_query( + "", None, "w", None, impersonated_user + ) + else: + driver.execute_query( + "", impersonated_user_=impersonated_user + ) + + session_cls_mock.assert_called_once() + session_config = session_cls_mock.call_args.args[1] + assert session_config.impersonated_user == impersonated_user + + +@pytest.mark.parametrize("bookmark_manager", (..., None, object())) +@pytest.mark.parametrize("positional", (True, False)) +@mark_sync_test +def test_execute_query_bookmark_manager( + mocker, positional: bool, + bookmark_manager: t.Union[BookmarkManager, BookmarkManager, None] +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + with driver as driver: + with assert_warns_execute_query_experimental(): + if bookmark_manager is Ellipsis: + with assert_warns_execute_query_bmm_experimental(): + bookmark_manager = driver.query_bookmark_manager + driver.execute_query("") + else: + if positional: + driver.execute_query( + "", None, "w", None, None, bookmark_manager + ) + else: + driver.execute_query( + "", bookmark_manager_=bookmark_manager + ) + + session_cls_mock.assert_called_once() + session_config = session_cls_mock.call_args.args[1] + assert session_config.bookmark_manager == bookmark_manager + + +@pytest.mark.parametrize("result_transformer", (..., object())) +@pytest.mark.parametrize("positional", (True, False)) +@mark_sync_test +def test_execute_query_result_transformer( + mocker, positional: bool, + result_transformer: t.Callable[[Result], t.Union[SomeClass]] +) -> None: + driver = GraphDatabase.driver("bolt://localhost") + session_cls_mock = mocker.patch("neo4j._sync.driver.Session", + autospec=True) + res: t.Any + with driver as driver: + with assert_warns_execute_query_experimental(): + if result_transformer is Ellipsis: + result_transformer = Result.to_eager_result + res_default: neo4j.EagerResult = driver.execute_query("") + res = res_default + else: + res_custom: SomeClass + if positional: + with assert_warns_execute_query_bmm_experimental(): + bmm = driver.query_bookmark_manager + res_custom = driver.execute_query( + "", None, "w", None, None, bmm, result_transformer + ) + else: + res_custom = driver.execute_query( + "", result_transformer_=result_transformer + ) + res = res_custom + + session_cls_mock.assert_called_once() + session_mock = session_cls_mock.return_value + session_mock.__enter__.assert_called_once() + session_mock.__exit__.assert_called_once() + session_executor_mock = session_mock.execute_write + session_executor_mock.assert_called_once_with( + _work, mocker.ANY, mocker.ANY, result_transformer + ) + assert res is session_executor_mock.return_value diff --git a/tests/unit/sync/work/test_result.py b/tests/unit/sync/work/test_result.py index cc65a17c2..40077093f 100644 --- a/tests/unit/sync/work/test_result.py +++ b/tests/unit/sync/work/test_result.py @@ -16,7 +16,9 @@ # limitations under the License. +import uuid import warnings +from contextlib import contextmanager from unittest import mock import pandas as pd @@ -25,6 +27,7 @@ from neo4j import ( Address, + EagerResult, ExperimentalWarning, Record, Result, @@ -53,6 +56,15 @@ from ...._async_compat import mark_sync_test +@contextmanager +def assert_warns_to_eager_result_experimental(): + with pytest.warns( + ExperimentalWarning, + match=r"^Result\.to_eager_result is experimental\." + ): + yield + + class Records: def __init__(self, fields, records): self.fields = tuple(fields) @@ -687,6 +699,50 @@ def test_result_graph(records): assert rel["since"] == 1999 +@pytest.mark.parametrize("records", ( + Records(["n"], []), + Records(["n"], [[42], [69], [420], [1337]]), + Records(["n1", "r", "n2"], [ + [ + # Node + Structure(b"N", 0, ["Person", "LabelTest1"], {"name": "Alice"}), + # Relationship + Structure(b"R", 0, 0, 1, "KNOWS", {"since": 1999}), + # Node + Structure(b"N", 1, ["Person", "LabelTest2"], {"name": "Bob"}), + ] + ]), + Records(["secret_sauce"], [[object()], [object()]]), +)) +@mark_sync_test +def test_to_eager_result(records): + summary = {"test_to_eager_result": uuid.uuid4()} + connection = ConnectionStub(records=records, summary_meta=summary) + result = Result(connection, 1, noop, noop) + result._run("CYPHER", {}, None, None, "r", None) + with assert_warns_to_eager_result_experimental(): + eager_result = result.to_eager_result() + + assert isinstance(eager_result, EagerResult) + + assert eager_result.records is eager_result[0] + assert isinstance(eager_result.records, list) + assert all(isinstance(r, Record) for r in eager_result.records) + assert len(eager_result.records) == len(records) + assert all(list(record) == list(raw) + for record, raw in zip(eager_result.records, records)) + + assert eager_result.summary is eager_result[1] + assert isinstance(eager_result.summary, ResultSummary) + assert (eager_result.summary.metadata.get("test_to_eager_result") + == summary["test_to_eager_result"]) + + assert eager_result.keys is eager_result[2] + assert isinstance(eager_result.keys, list) + assert all(isinstance(k, str) for k in eager_result.keys) + assert eager_result.keys == list(records.fields) + + @pytest.mark.parametrize( ("keys", "values", "types", "instances"), (