Skip to content

Propagate errors across all results in a transaction #973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,8 @@ Client-side errors

* :class:`neo4j.exceptions.ResultError`

* :class:`neo4j.exceptions.ResultFailedError`

* :class:`neo4j.exceptions.ResultConsumedError`

* :class:`neo4j.exceptions.ResultNotSingleError`
Expand Down Expand Up @@ -1946,6 +1948,9 @@ Client-side errors
:show-inheritance:
:members: result

.. autoexception:: neo4j.exceptions.ResultFailedError()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand if we have settled on a common name for this already. but is this error indicative what is actually wrong? "The result failed" is very vague, would something like: TransactionInvalidStateError be a bit more explanative?
if the detail message could be:
The transaction that contains this result has encountered error, all results in this transaction are therefore invalid. if you could attach the transaction to the error so users can access it that could be useful for debugging

Copy link
Member Author

@robsdedude robsdedude Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have not defined what exact error should be thrown. So this sure is up for debate.

However, there is more detail to this massage:

The error name is purposefully vague because it can have multiple reasons why the result would throw it:

  • The result in question failed but the user ignored it and kept using the result
  • A different result in the same transaction failed but the user ignored it

In either case, the raise ResultFailedError(...) from ... here (specifically the from bit) will make the error say something like "this is a direct cause of <insert original error with stack trace>".

Copy link
Contributor

@thelonelyvulpes thelonelyvulpes Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, should we consider exposing a property isFailed:boolean on the result? Whether that is in this change or subsequent one.

:show-inheritance:

.. autoexception:: neo4j.exceptions.ResultConsumedError()
:show-inheritance:

Expand Down
25 changes: 24 additions & 1 deletion src/neo4j/_async/work/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from ...exceptions import (
ResultConsumedError,
ResultFailedError,
ResultNotSingleError,
)
from ...time import (
Expand All @@ -57,6 +58,10 @@
_TResultKey = t.Union[int, str]


_RESULT_FAILED_ERROR = (
"The result has failed. Either this result or another result in the same "
"transaction has encountered an error."
)
_RESULT_OUT_OF_SCOPE_ERROR = (
"The result is out of scope. The associated transaction "
"has been closed. Results can only be used while the "
Expand All @@ -76,8 +81,11 @@ class AsyncResult:
"""

def __init__(self, connection, fetch_size, on_closed, on_error):
self._connection = ConnectionErrorHandler(connection, on_error)
self._connection = ConnectionErrorHandler(
connection, self._connection_error_handler
)
self._hydration_scope = connection.new_hydration_scope()
self._on_error = on_error
self._on_closed = on_closed
self._metadata = None
self._keys = None
Expand All @@ -101,6 +109,13 @@ def __init__(self, connection, fetch_size, on_closed, on_error):
self._consumed = False
# the result has been closed as a result of closing the transaction
self._out_of_scope = False
# exception shared across all results of a transaction
self._exception = None

async def _connection_error_handler(self, exc):
self._exception = exc
self._attached = False
await AsyncUtil.callback(self._on_error, exc)

@property
def _qid(self):
Expand Down Expand Up @@ -257,6 +272,9 @@ async def __aiter__(self) -> t.AsyncIterator[Record]:
await self._connection.send_all()

self._exhausted = True
if self._exception is not None:
raise ResultFailedError(self, _RESULT_FAILED_ERROR) \
from self._exception
if self._out_of_scope:
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
if self._consumed:
Expand Down Expand Up @@ -346,6 +364,11 @@ async def _tx_end(self):
await self._exhaust()
self._out_of_scope = True

def _tx_failure(self, exc):
# Handle failure of the associated transaction.
self._attached = False
self._exception = exc

async def consume(self) -> ResultSummary:
"""Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.

Expand Down
2 changes: 2 additions & 0 deletions src/neo4j/_async/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ async def _result_on_closed_handler(self):

async def _error_handler(self, exc):
self._last_error = exc
for result in self._results:
result._tx_failure(exc)
if isinstance(exc, asyncio.CancelledError):
self._cancel()
return
Expand Down
25 changes: 24 additions & 1 deletion src/neo4j/_sync/work/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from ...exceptions import (
ResultConsumedError,
ResultFailedError,
ResultNotSingleError,
)
from ...time import (
Expand All @@ -57,6 +58,10 @@
_TResultKey = t.Union[int, str]


_RESULT_FAILED_ERROR = (
"The result has failed. Either this result or another result in the same "
"transaction has encountered an error."
)
_RESULT_OUT_OF_SCOPE_ERROR = (
"The result is out of scope. The associated transaction "
"has been closed. Results can only be used while the "
Expand All @@ -76,8 +81,11 @@ class Result:
"""

def __init__(self, connection, fetch_size, on_closed, on_error):
self._connection = ConnectionErrorHandler(connection, on_error)
self._connection = ConnectionErrorHandler(
connection, self._connection_error_handler
)
self._hydration_scope = connection.new_hydration_scope()
self._on_error = on_error
self._on_closed = on_closed
self._metadata = None
self._keys = None
Expand All @@ -101,6 +109,13 @@ def __init__(self, connection, fetch_size, on_closed, on_error):
self._consumed = False
# the result has been closed as a result of closing the transaction
self._out_of_scope = False
# exception shared across all results of a transaction
self._exception = None

def _connection_error_handler(self, exc):
self._exception = exc
self._attached = False
Util.callback(self._on_error, exc)

@property
def _qid(self):
Expand Down Expand Up @@ -257,6 +272,9 @@ def __iter__(self) -> t.Iterator[Record]:
self._connection.send_all()

self._exhausted = True
if self._exception is not None:
raise ResultFailedError(self, _RESULT_FAILED_ERROR) \
from self._exception
if self._out_of_scope:
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
if self._consumed:
Expand Down Expand Up @@ -346,6 +364,11 @@ def _tx_end(self):
self._exhaust()
self._out_of_scope = True

def _tx_failure(self, exc):
# Handle failure of the associated transaction.
self._attached = False
self._exception = exc

def consume(self) -> ResultSummary:
"""Consume the remainder of this result and return a :class:`neo4j.ResultSummary`.

Expand Down
2 changes: 2 additions & 0 deletions src/neo4j/_sync/work/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def _result_on_closed_handler(self):

def _error_handler(self, exc):
self._last_error = exc
for result in self._results:
result._tx_failure(exc)
if isinstance(exc, asyncio.CancelledError):
self._cancel()
return
Expand Down
12 changes: 12 additions & 0 deletions src/neo4j/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
+ TransactionError
+ TransactionNestingError
+ ResultError
+ ResultFailedError
+ ResultConsumedError
+ ResultNotSingleError
+ BrokenRecordError
Expand Down Expand Up @@ -464,6 +465,17 @@ def __init__(self, result_, *args, **kwargs):
self.result = result_


# DriverError > ResultError > ResultFailedError
class ResultFailedError(ResultError):
"""Raised when trying to access records of a failed result.

A :class:`.Result` will be considered failed if
* itself encountered an error while fetching records
* another result within the same transaction encountered an error while
fetching records
"""


# DriverError > ResultError > ResultConsumedError
class ResultConsumedError(ResultError):
"""Raised when trying to access records of a consumed result."""
Expand Down
2 changes: 1 addition & 1 deletion testkitbackend/_async/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async def ExecuteQuery(backend, data):

def resolution_func(backend, custom_resolver=False, custom_dns_resolver=False):
# This solution (putting custom resolution together with DNS resolution
# into one function only works because the Python driver calls the custom
# into one function) only works because the Python driver calls the custom
# resolver function for every connection, which is not true for all
# drivers. Properly exposing a way to change the DNS lookup behavior is not
# possible without changing the driver's code.
Expand Down
2 changes: 1 addition & 1 deletion testkitbackend/_sync/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def ExecuteQuery(backend, data):

def resolution_func(backend, custom_resolver=False, custom_dns_resolver=False):
# This solution (putting custom resolution together with DNS resolution
# into one function only works because the Python driver calls the custom
# into one function) only works because the Python driver calls the custom
# resolver function for every connection, which is not true for all
# drivers. Properly exposing a way to change the DNS lookup behavior is not
# possible without changing the driver's code.
Expand Down
8 changes: 1 addition & 7 deletions testkitbackend/test_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,7 @@
"'neo4j.datatypes.test_temporal_types.TestDataTypes.test_should_echo_all_timezone_ids'":
"test_subtest_skips.dt_conversion",
"'neo4j.datatypes.test_temporal_types.TestDataTypes.test_date_time_cypher_created_tz_id'":
"test_subtest_skips.tz_id",
"'stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_discard_after_tx_termination_on_run'":
"Fixme: transactions don't prevent further actions after failure.",
"'stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_pull'":
"Fixme: transactions don't prevent further actions after failure.",
"'stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_run'":
"Fixme: transactions don't prevent further actions after failure."
"test_subtest_skips.tz_id"
},
"features": {
"Feature:API:BookmarkManager": true,
Expand Down
12 changes: 11 additions & 1 deletion tests/unit/async_/fixtures/fake_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from neo4j._async.io import AsyncBolt
from neo4j._deadline import Deadline
from neo4j.auth_management import AsyncAuthManager
from neo4j.exceptions import Neo4jError


__all__ = [
Expand Down Expand Up @@ -154,10 +155,12 @@ def set_script(self, callbacks):
[
("run", {"on_success": ({},), "on_summary": None}),
("pull", {
"on_records": ([some_record],),
"on_success": None,
"on_summary": None,
"on_records":
})
# use any exception to throw it instead of calling handlers
("commit", RuntimeError("oh no!"))
]
```
Note that arguments can be `None`. In this case, ScriptedConnection
Expand All @@ -180,6 +183,9 @@ def func(*args, **kwargs):
self._script_pos += 1

async def callback():
if isinstance(scripted_callbacks, BaseException):
raise scripted_callbacks
error = None
for cb_name, default_cb_args in (
("on_ignored", ({},)),
("on_failure", ({},)),
Expand All @@ -197,10 +203,14 @@ async def callback():
if cb_args is None:
cb_args = default_cb_args
res = cb(*cb_args)
if cb_name == "on_failure":
error = Neo4jError.hydrate(**cb_args[0])
try:
await res # maybe the callback is async
except TypeError:
pass # or maybe it wasn't ;)
if error is not None:
raise error

self.callbacks.append(callback)

Expand Down
54 changes: 53 additions & 1 deletion tests/unit/async_/work/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@


from unittest.mock import MagicMock
from uuid import uuid4

import pytest

Expand All @@ -26,6 +25,11 @@
NotificationMinimumSeverity,
Query,
)
from neo4j.exceptions import (
ClientError,
ResultFailedError,
ServiceUnavailable,
)

from ...._async_compat import mark_async_test

Expand Down Expand Up @@ -275,3 +279,51 @@ async def test_transaction_begin_pipelining(
expected_calls.append(("send_all",))
expected_calls.append(("fetch_all",))
assert async_fake_connection.method_calls == expected_calls


@pytest.mark.parametrize("error", ("server", "connection"))
@mark_async_test
async def test_server_error_propagates(async_scripted_connection, error):
connection = async_scripted_connection
script = [
# res 1
("run", {"on_success": ({"fields": ["n"]},), "on_summary": None}),
("pull", {"on_records": ([[1], [2]],),
"on_success": ({"has_more": True},)}),
# res 2
("run", {"on_success": ({"fields": ["n"]},), "on_summary": None}),
("pull", {"on_records": ([[1], [2]],),
"on_success": ({"has_more": True},)}),
]
if error == "server":
script.append(
("pull", {"on_failure": ({"code": "Neo.ClientError.Made.Up"},),
"on_summary": None})
)
expected_error = ClientError
elif error == "connection":
script.append(("pull", ServiceUnavailable()))
expected_error = ServiceUnavailable
else:
raise ValueError(f"Unknown error type {error}")
connection.set_script(script)

tx = AsyncTransaction(
connection, 2, lambda *args, **kwargs: None,
lambda *args, **kwargs: None, lambda *args, **kwargs: None
)
res1 = await tx.run("UNWIND range(1, 1000) AS n RETURN n")
assert await res1.__anext__() == {"n": 1}

res2 = await tx.run("RETURN 'causes error later'")
assert await res2.fetch(2) == [{"n": 1}, {"n": 2}]
with pytest.raises(expected_error) as exc1:
await res2.__anext__()

# can finish the buffer
assert await res1.fetch(1) == [{"n": 2}]
# then fails because the connection was broken by res2
with pytest.raises(ResultFailedError) as exc2:
await res1.__anext__()

assert exc1.value is exc2.value.__cause__
12 changes: 11 additions & 1 deletion tests/unit/sync/fixtures/fake_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from neo4j._deadline import Deadline
from neo4j._sync.io import Bolt
from neo4j.auth_management import AuthManager
from neo4j.exceptions import Neo4jError


__all__ = [
Expand Down Expand Up @@ -154,10 +155,12 @@ def set_script(self, callbacks):
[
("run", {"on_success": ({},), "on_summary": None}),
("pull", {
"on_records": ([some_record],),
"on_success": None,
"on_summary": None,
"on_records":
})
# use any exception to throw it instead of calling handlers
("commit", RuntimeError("oh no!"))
]
```
Note that arguments can be `None`. In this case, ScriptedConnection
Expand All @@ -180,6 +183,9 @@ def func(*args, **kwargs):
self._script_pos += 1

def callback():
if isinstance(scripted_callbacks, BaseException):
raise scripted_callbacks
error = None
for cb_name, default_cb_args in (
("on_ignored", ({},)),
("on_failure", ({},)),
Expand All @@ -197,10 +203,14 @@ def callback():
if cb_args is None:
cb_args = default_cb_args
res = cb(*cb_args)
if cb_name == "on_failure":
error = Neo4jError.hydrate(**cb_args[0])
try:
res # maybe the callback is async
except TypeError:
pass # or maybe it wasn't ;)
if error is not None:
raise error

self.callbacks.append(callback)

Expand Down
Loading