Skip to content

Commit d4d1c84

Browse files
authored
Optimize driver.execute_query by pipelining BEGIN (#956)
Signed-off-by: Grant Lodge <[email protected]>
1 parent e11879e commit d4d1c84

File tree

13 files changed

+295
-136
lines changed

13 files changed

+295
-136
lines changed

src/neo4j/_async/driver.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -860,9 +860,10 @@ async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
860860
else:
861861
raise ValueError("Invalid routing control value: %r"
862862
% routing_)
863-
return await executor(
864-
_work, query_, parameters, result_transformer_
865-
)
863+
with session._pipelined_begin:
864+
return await executor(
865+
_work, query_, parameters, result_transformer_
866+
)
866867

867868
@property
868869
def execute_query_bookmark_manager(self) -> AsyncBookmarkManager:

src/neo4j/_async/work/session.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
deprecated,
3333
PreviewWarning,
3434
)
35+
from ..._util import ContextBool
3536
from ..._work import Query
3637
from ...api import (
3738
Bookmarks,
@@ -100,6 +101,10 @@ class AsyncSession(AsyncWorkspace):
100101
# The state this session is in.
101102
_state_failed = False
102103

104+
_config: SessionConfig
105+
_bookmark_manager: t.Optional[Bookmarks]
106+
_pipelined_begin: ContextBool
107+
103108
def __init__(self, pool, session_config):
104109
assert isinstance(session_config, SessionConfig)
105110
if session_config.auth is not None:
@@ -115,6 +120,7 @@ def __init__(self, pool, session_config):
115120
self._config = session_config
116121
self._initialize_bookmarks(session_config.bookmarks)
117122
self._bookmark_manager = session_config.bookmark_manager
123+
self._pipelined_begin = ContextBool()
118124

119125
async def __aenter__(self) -> AsyncSession:
120126
return self
@@ -421,6 +427,7 @@ async def _open_transaction(
421427
bookmarks, access_mode, metadata, timeout,
422428
self._config.notifications_min_severity,
423429
self._config.notifications_disabled_categories,
430+
pipelined=self._pipelined_begin
424431
)
425432

426433
async def begin_transaction(
@@ -480,9 +487,15 @@ async def begin_transaction(
480487

481488
return t.cast(AsyncTransaction, self._transaction)
482489

490+
483491
async def _run_transaction(
484-
self, access_mode, transaction_function, *args, **kwargs
485-
):
492+
self,
493+
access_mode: str,
494+
transaction_function: t.Callable[
495+
te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]
496+
],
497+
*args: _P.args, **kwargs: _P.kwargs
498+
) -> _R:
486499
self._check_state()
487500
if not callable(transaction_function):
488501
raise TypeError("Unit of work is not callable")
@@ -498,7 +511,7 @@ async def _run_transaction(
498511

499512
errors = []
500513

501-
t0 = -1 # Timer
514+
t0: float = -1 # Timer
502515

503516
while True:
504517
try:
@@ -507,6 +520,7 @@ async def _run_transaction(
507520
access_mode=access_mode, metadata=metadata,
508521
timeout=timeout
509522
)
523+
assert isinstance(self._transaction, AsyncManagedTransaction)
510524
tx = self._transaction
511525
try:
512526
result = await transaction_function(tx, *args, **kwargs)

src/neo4j/_async/work/transaction.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ async def _exit(self, exception_type, exception_value, traceback):
7474
async def _begin(
7575
self, database, imp_user, bookmarks, access_mode, metadata, timeout,
7676
notifications_min_severity, notifications_disabled_categories,
77+
pipelined=False,
7778
):
7879
self._database = database
7980
self._connection.begin(
@@ -82,8 +83,9 @@ async def _begin(
8283
notifications_min_severity=notifications_min_severity,
8384
notifications_disabled_categories=notifications_disabled_categories
8485
)
85-
await self._error_handling_connection.send_all()
86-
await self._error_handling_connection.fetch_all()
86+
if not pipelined:
87+
await self._error_handling_connection.send_all()
88+
await self._error_handling_connection.fetch_all()
8789

8890
async def _result_on_closed_handler(self):
8991
pass

src/neo4j/_sync/driver.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -859,9 +859,10 @@ def example(driver: neo4j.Driver) -> neo4j.Record::
859859
else:
860860
raise ValueError("Invalid routing control value: %r"
861861
% routing_)
862-
return executor(
863-
_work, query_, parameters, result_transformer_
864-
)
862+
with session._pipelined_begin:
863+
return executor(
864+
_work, query_, parameters, result_transformer_
865+
)
865866

866867
@property
867868
def execute_query_bookmark_manager(self) -> BookmarkManager:

src/neo4j/_sync/work/session.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
deprecated,
3333
PreviewWarning,
3434
)
35+
from ..._util import ContextBool
3536
from ..._work import Query
3637
from ...api import (
3738
Bookmarks,
@@ -100,6 +101,10 @@ class Session(Workspace):
100101
# The state this session is in.
101102
_state_failed = False
102103

104+
_config: SessionConfig
105+
_bookmark_manager: t.Optional[Bookmarks]
106+
_pipelined_begin: ContextBool
107+
103108
def __init__(self, pool, session_config):
104109
assert isinstance(session_config, SessionConfig)
105110
if session_config.auth is not None:
@@ -115,6 +120,7 @@ def __init__(self, pool, session_config):
115120
self._config = session_config
116121
self._initialize_bookmarks(session_config.bookmarks)
117122
self._bookmark_manager = session_config.bookmark_manager
123+
self._pipelined_begin = ContextBool()
118124

119125
def __enter__(self) -> Session:
120126
return self
@@ -421,6 +427,7 @@ def _open_transaction(
421427
bookmarks, access_mode, metadata, timeout,
422428
self._config.notifications_min_severity,
423429
self._config.notifications_disabled_categories,
430+
pipelined=self._pipelined_begin
424431
)
425432

426433
def begin_transaction(
@@ -480,9 +487,15 @@ def begin_transaction(
480487

481488
return t.cast(Transaction, self._transaction)
482489

490+
483491
def _run_transaction(
484-
self, access_mode, transaction_function, *args, **kwargs
485-
):
492+
self,
493+
access_mode: str,
494+
transaction_function: t.Callable[
495+
te.Concatenate[ManagedTransaction, _P], t.Union[_R]
496+
],
497+
*args: _P.args, **kwargs: _P.kwargs
498+
) -> _R:
486499
self._check_state()
487500
if not callable(transaction_function):
488501
raise TypeError("Unit of work is not callable")
@@ -498,7 +511,7 @@ def _run_transaction(
498511

499512
errors = []
500513

501-
t0 = -1 # Timer
514+
t0: float = -1 # Timer
502515

503516
while True:
504517
try:
@@ -507,6 +520,7 @@ def _run_transaction(
507520
access_mode=access_mode, metadata=metadata,
508521
timeout=timeout
509522
)
523+
assert isinstance(self._transaction, ManagedTransaction)
510524
tx = self._transaction
511525
try:
512526
result = transaction_function(tx, *args, **kwargs)

src/neo4j/_sync/work/transaction.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def _exit(self, exception_type, exception_value, traceback):
7474
def _begin(
7575
self, database, imp_user, bookmarks, access_mode, metadata, timeout,
7676
notifications_min_severity, notifications_disabled_categories,
77+
pipelined=False,
7778
):
7879
self._database = database
7980
self._connection.begin(
@@ -82,8 +83,9 @@ def _begin(
8283
notifications_min_severity=notifications_min_severity,
8384
notifications_disabled_categories=notifications_disabled_categories
8485
)
85-
self._error_handling_connection.send_all()
86-
self._error_handling_connection.fetch_all()
86+
if not pipelined:
87+
self._error_handling_connection.send_all()
88+
self._error_handling_connection.fetch_all()
8789

8890
def _result_on_closed_handler(self):
8991
pass

src/neo4j/_util/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Copyright (c) "Neo4j"
2+
# Neo4j Sweden AB [https://neo4j.com]
3+
#
4+
# This file is part of Neo4j.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# https://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
from ._context_bool import ContextBool
20+
21+
22+
__all__ = ["ContextBool"]

src/neo4j/_util/_context_bool.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright (c) "Neo4j"
2+
# Neo4j Sweden AB [https://neo4j.com]
3+
#
4+
# This file is part of Neo4j.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# https://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
from __future__ import annotations
20+
21+
22+
__all__ = ["ContextBool"]
23+
24+
25+
class ContextBool:
26+
def __init__(self) -> None:
27+
self._value = False
28+
29+
def __bool__(self) -> bool:
30+
return self._value
31+
32+
def __enter__(self) -> None:
33+
self._value = True
34+
35+
def __exit__(self, exc_type, exc_value, traceback) -> None:
36+
self._value = False

testkitbackend/test_config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
"Optimization:AuthPipelining": true,
6666
"Optimization:ConnectionReuse": true,
6767
"Optimization:EagerTransactionBegin": true,
68+
"Optimization:ExecuteQueryPipelining": true,
6869
"Optimization:ImplicitDefaultArguments": true,
6970
"Optimization:MinimalBookmarksSet": true,
7071
"Optimization:MinimalResets": true,

0 commit comments

Comments
 (0)