Skip to content

Commit cb01e6e

Browse files
robsdedudeMaxAake
andauthored
ADR 031: Homedb Cache (Bolt 5.8) (#1115)
Home database cache/optimistic routing (Bolt 5.8) Introducing support for Bolt 5.8, which (among other things) makes the server echo back the resolved home/default db on starting a transaction as well as a connection hint whether server-side routing (SSR) is enabled. If so, the echoed db can be used together with an optimistic home db cache to guess a client-side route (optimistic routing - falling back to SSR on wrong guesses). This feature saves a round-trip under the following conditions (*all* must be fulfilled): - Server has SSR enabled - Sever supports Bolt 5.8 - A fresh (yet unused) session without an explicitly configured target database starts its first transaction (auto-commit or explicit) Co-authored-by: MaxAake <[email protected]>
1 parent e66ff23 commit cb01e6e

File tree

87 files changed

+5245
-515
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+5245
-515
lines changed

.gitattributes

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# configure github not to display generated files
22
/src/neo4j/_sync/** linguist-generated=true
3-
/tests/unit/sync_/** linguist-generated=true
4-
/tests/integration/sync_/** linguist-generated=true
3+
/tests/unit/sync/** linguist-generated=true
4+
/tests/integration/sync/** linguist-generated=true
55
/testkitbackend/_sync/** linguist-generated=true

docs/source/api.rst

+3-2
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ Closing a driver will immediately shut down all connections in the pool.
260260
:param database\_:
261261
Database to execute the query against.
262262

263-
None (default) uses the database configured on the server side.
263+
:data:`None` (default) uses the database configured on the server
264+
side.
264265

265266
.. Note::
266267
It is recommended to always specify the database explicitly
@@ -1034,7 +1035,7 @@ Specifically, the following applies:
10341035
all queries within that session are executed with the explicit database
10351036
name 'movies' supplied. Any change to the user’s home database is
10361037
reflected only in sessions created after such change takes effect. This
1037-
behavior requires additional network communication. In clustered
1038+
behavior may require additional network communication. In clustered
10381039
environments, it is strongly recommended to avoid a single point of
10391040
failure. For instance, by ensuring that the connection URI resolves to
10401041
multiple endpoints. For older Bolt protocol versions the behavior is the

docs/source/async_api.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ Closing a driver will immediately shut down all connections in the pool.
247247
:param database\_:
248248
Database to execute the query against.
249249

250-
None (default) uses the database configured on the server side.
250+
:data:`None` (default) uses the database configured on the server
251+
side.
251252

252253
.. Note::
253254
It is recommended to always specify the database explicitly

src/neo4j/_async/driver.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,8 @@ async def example(driver: neo4j.AsyncDriver) -> int:
799799
:param database_:
800800
Database to execute the query against.
801801
802-
None (default) uses the database configured on the server side.
802+
:data:`None` (default) uses the database configured on the server
803+
side.
803804
804805
.. Note::
805806
It is recommended to always specify the database explicitly

src/neo4j/_async/home_db_cache.py

+150
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright (c) "Neo4j"
2+
# Neo4j Sweden AB [https://neo4j.com]
3+
#
4+
# This file is part of Neo4j.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# https://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
from __future__ import annotations
20+
21+
import math
22+
import typing as t
23+
from time import monotonic
24+
25+
from .._async_compat.concurrency import AsyncCooperativeLock
26+
27+
28+
if t.TYPE_CHECKING:
29+
import typing_extensions as te
30+
31+
TKey: te.TypeAlias = t.Union[
32+
str,
33+
t.Tuple[t.Tuple[str, t.Hashable], ...],
34+
t.Tuple[None],
35+
]
36+
TVal: te.TypeAlias = t.Tuple[float, str]
37+
38+
39+
class AsyncHomeDbCache:
40+
_ttl: float
41+
_enabled: bool
42+
_max_size: int | None
43+
44+
def __init__(
45+
self,
46+
enabled: bool = True,
47+
ttl: float = float("inf"),
48+
max_size: int | None = None,
49+
) -> None:
50+
if math.isnan(ttl) or ttl <= 0:
51+
raise ValueError(f"home db cache ttl must be greater 0, got {ttl}")
52+
self._enabled = enabled
53+
self._ttl = ttl
54+
self._cache: dict[TKey, TVal] = {}
55+
self._lock = AsyncCooperativeLock()
56+
self._oldest_entry = monotonic()
57+
if max_size is not None and max_size <= 0:
58+
raise ValueError(
59+
f"home db cache max_size must be greater 0 or None, "
60+
f"got {max_size}"
61+
)
62+
self._max_size = max_size
63+
self._truncate_size = (
64+
min(max_size, int(0.01 * max_size * math.log(max_size)))
65+
if max_size is not None
66+
else None
67+
)
68+
69+
def compute_key(
70+
self,
71+
imp_user: str | None,
72+
auth: dict | None,
73+
) -> TKey:
74+
if not self._enabled:
75+
return (None,)
76+
if imp_user is not None:
77+
return imp_user
78+
if auth is not None:
79+
return _consolidate_auth_token(auth)
80+
return (None,)
81+
82+
def get(self, key: TKey) -> str | None:
83+
if not self._enabled:
84+
return None
85+
with self._lock:
86+
self._clean(monotonic())
87+
val = self._cache.get(key)
88+
if val is None:
89+
return None
90+
return val[1]
91+
92+
def set(self, key: TKey, value: str | None) -> None:
93+
if not self._enabled:
94+
return
95+
with self._lock:
96+
now = monotonic()
97+
self._clean(now)
98+
if value is None:
99+
self._cache.pop(key, None)
100+
else:
101+
self._cache[key] = (now, value)
102+
103+
def clear(self) -> None:
104+
if not self._enabled:
105+
return
106+
with self._lock:
107+
self._cache = {}
108+
self._oldest_entry = monotonic()
109+
110+
def _clean(self, now: float | None = None) -> None:
111+
now = monotonic() if now is None else now
112+
if now - self._oldest_entry > self._ttl:
113+
self._cache = {
114+
k: v
115+
for k, v in self._cache.items()
116+
if now - v[0] < self._ttl * 0.9
117+
}
118+
self._oldest_entry = min(
119+
(v[0] for v in self._cache.values()), default=now
120+
)
121+
if self._max_size and len(self._cache) > self._max_size:
122+
self._cache = dict(
123+
sorted(
124+
self._cache.items(),
125+
key=lambda item: item[1][0],
126+
reverse=True,
127+
)[: self._truncate_size]
128+
)
129+
130+
def __len__(self) -> int:
131+
return len(self._cache)
132+
133+
@property
134+
def enabled(self) -> bool:
135+
return self._enabled
136+
137+
138+
def _consolidate_auth_token(auth: dict) -> tuple | str:
139+
if auth.get("scheme") == "basic" and isinstance(
140+
auth.get("principal"), str
141+
):
142+
return auth["principal"]
143+
return _hashable_dict(auth)
144+
145+
146+
def _hashable_dict(d: dict) -> tuple:
147+
return tuple(
148+
(k, _hashable_dict(v) if isinstance(v, dict) else v)
149+
for k, v in sorted(d.items())
150+
)

src/neo4j/_async/io/__init__.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
"""
2323

2424
__all__ = [
25-
"AcquireAuth",
25+
"AcquisitionAuth",
26+
"AcquisitionDatabase",
2627
"AsyncBolt",
2728
"AsyncBoltPool",
2829
"AsyncNeo4jPool",
@@ -37,7 +38,8 @@
3738
ConnectionErrorHandler,
3839
)
3940
from ._pool import (
40-
AcquireAuth,
41+
AcquisitionAuth,
42+
AcquisitionDatabase,
4143
AsyncBoltPool,
4244
AsyncNeo4jPool,
4345
)

src/neo4j/_async/io/_bolt.py

+14-25
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
from ..._async_compat.network import AsyncBoltSocket
2727
from ..._async_compat.util import AsyncUtil
28+
from ..._auth_management import to_auth_dict
2829
from ..._codec.hydration import (
2930
HydrationHandlerABC,
3031
v1 as hydration_v1,
@@ -39,12 +40,10 @@
3940
from ..._sync.config import PoolConfig
4041
from ...addressing import ResolvedAddress
4142
from ...api import (
42-
Auth,
4343
ServerInfo,
4444
Version,
4545
)
4646
from ...exceptions import (
47-
AuthError,
4847
ConfigurationError,
4948
DriverError,
5049
IncompleteCommit,
@@ -158,10 +157,7 @@ def __init__(
158157
),
159158
self.PROTOCOL_VERSION,
160159
)
161-
# so far `connection.recv_timeout_seconds` is the only available
162-
# configuration hint that exists. Therefore, all hints can be stored at
163-
# connection level. This might change in the future.
164-
self.configuration_hints = {}
160+
self.connection_hints = {}
165161
self.patch = {}
166162
self.outbox = AsyncOutbox(
167163
self.socket,
@@ -187,7 +183,7 @@ def __init__(
187183
self.user_agent = USER_AGENT
188184

189185
self.auth = auth
190-
self.auth_dict = self._to_auth_dict(auth)
186+
self.auth_dict = to_auth_dict(auth)
191187
self.auth_manager = auth_manager
192188
self.telemetry_disabled = telemetry_disabled
193189

@@ -206,26 +202,14 @@ def _get_server_state_manager(self) -> ServerStateManagerBase: ...
206202
@abc.abstractmethod
207203
def _get_client_state_manager(self) -> ClientStateManagerBase: ...
208204

209-
@classmethod
210-
def _to_auth_dict(cls, auth):
211-
# Determine auth details
212-
if not auth:
213-
return {}
214-
elif isinstance(auth, tuple) and 2 <= len(auth) <= 3:
215-
return vars(Auth("basic", *auth))
216-
else:
217-
try:
218-
return vars(auth)
219-
except (KeyError, TypeError) as e:
220-
# TODO: 6.0 - change this to be a DriverError (or subclass)
221-
raise AuthError(
222-
f"Cannot determine auth details from {auth!r}"
223-
) from e
224-
225205
@property
226206
def connection_id(self):
227207
return self.server_info._metadata.get("connection_id", "<unknown id>")
228208

209+
@property
210+
@abc.abstractmethod
211+
def ssr_enabled(self) -> bool: ...
212+
229213
@property
230214
@abc.abstractmethod
231215
def supports_multiple_results(self):
@@ -308,6 +292,7 @@ def protocol_handlers(cls, protocol_version=None):
308292
AsyncBolt5x5,
309293
AsyncBolt5x6,
310294
AsyncBolt5x7,
295+
AsyncBolt5x8,
311296
)
312297

313298
handlers = {
@@ -325,6 +310,7 @@ def protocol_handlers(cls, protocol_version=None):
325310
AsyncBolt5x5.PROTOCOL_VERSION: AsyncBolt5x5,
326311
AsyncBolt5x6.PROTOCOL_VERSION: AsyncBolt5x6,
327312
AsyncBolt5x7.PROTOCOL_VERSION: AsyncBolt5x7,
313+
AsyncBolt5x8.PROTOCOL_VERSION: AsyncBolt5x8,
328314
}
329315

330316
if protocol_version is None:
@@ -461,7 +447,10 @@ async def open(
461447

462448
# avoid new lines after imports for better readability and conciseness
463449
# fmt: off
464-
if protocol_version == (5, 7):
450+
if protocol_version == (5, 8):
451+
from ._bolt5 import AsyncBolt5x8
452+
bolt_cls = AsyncBolt5x8
453+
elif protocol_version == (5, 7):
465454
from ._bolt5 import AsyncBolt5x7
466455
bolt_cls = AsyncBolt5x7
467456
elif protocol_version == (5, 6):
@@ -626,7 +615,7 @@ def re_auth(
626615
627616
:returns: whether the auth was changed
628617
"""
629-
new_auth_dict = self._to_auth_dict(auth)
618+
new_auth_dict = to_auth_dict(auth)
630619
if not force and new_auth_dict == self.auth_dict:
631620
self.auth_manager = auth_manager
632621
self.auth = auth

src/neo4j/_async/io/_bolt3.py

+2
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ class AsyncBolt3(AsyncBolt):
148148

149149
PROTOCOL_VERSION = Version(3, 0)
150150

151+
ssr_enabled = False
152+
151153
supports_multiple_results = False
152154

153155
supports_multiple_databases = False

src/neo4j/_async/io/_bolt4.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ class AsyncBolt4x0(AsyncBolt):
6464

6565
PROTOCOL_VERSION = Version(4, 0)
6666

67+
ssr_enabled = False
68+
6769
supports_multiple_results = True
6870

6971
supports_multiple_databases = True
@@ -614,10 +616,10 @@ async def hello(self, dehydration_hooks=None, hydration_hooks=None):
614616
)
615617

616618
def on_success(metadata):
617-
self.configuration_hints.update(metadata.pop("hints", {}))
619+
self.connection_hints.update(metadata.pop("hints", {}))
618620
self.server_info.update(metadata)
619-
if "connection.recv_timeout_seconds" in self.configuration_hints:
620-
recv_timeout = self.configuration_hints[
621+
if "connection.recv_timeout_seconds" in self.connection_hints:
622+
recv_timeout = self.connection_hints[
621623
"connection.recv_timeout_seconds"
622624
]
623625
if isinstance(recv_timeout, int) and recv_timeout > 0:

0 commit comments

Comments
 (0)