Skip to content

Commit 986b4b0

Browse files
committed
Improve ordering of connection initialization
Make sure every connection acquired from the connection pool is fully initialized and authenticated. Before this change, a connection was created in a synchronous way. Then protocol handshake and INIT message were sent asynchronously. So creation of a connection looked like a sync operation but it is not. Authentication info was also sent with INIT message regardless of the Bolt protocol negotiation. This worked fine for Bolt V1 and V2 because they used same INIT message. However, for Bolt V3 initialization has to happen after protocol version negotiation because the initialization message is different. Commit also moves error handling to the connection layer. Before, error handling/processing was in both `StreamObserver` and `Transaction`. It's mostly needed to handle failures in RoutingDriver (forget address from the routing table). Now handling of errors will be done in a single place - `Connection` object using a special `ConnectionErrorHandler`. This handler is different for direct driver and routing drivers.
1 parent b2e62e6 commit 986b4b0

27 files changed

+918
-844
lines changed

src/v1/driver.js

+23-22
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919

2020
import Session from './session';
2121
import Pool from './internal/pool';
22-
import {connect} from './internal/connector';
22+
import Connection from './internal/connection';
2323
import StreamObserver from './internal/stream-observer';
2424
import {newError, SERVICE_UNAVAILABLE} from './error';
2525
import {DirectConnectionProvider} from './internal/connection-providers';
2626
import Bookmark from './internal/bookmark';
2727
import ConnectivityVerifier from './internal/connectivity-verifier';
2828
import PoolConfig, {DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE} from './internal/pool-config';
2929
import Logger from './internal/logger';
30+
import ConnectionErrorHandler from './internal/connection-error-handler';
3031

3132
const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000; // 1 hour
3233

@@ -62,18 +63,18 @@ class Driver {
6263
* @constructor
6364
* @param {string} hostPort
6465
* @param {string} userAgent
65-
* @param {object} token
66+
* @param {object} authToken
6667
* @param {object} config
6768
* @protected
6869
*/
69-
constructor(hostPort, userAgent, token = {}, config = {}) {
70+
constructor(hostPort, userAgent, authToken = {}, config = {}) {
7071
sanitizeConfig(config);
7172

7273
this._id = idGenerator++;
7374
this._hostPort = hostPort;
7475
this._userAgent = userAgent;
7576
this._openConnections = {};
76-
this._token = token;
77+
this._authToken = authToken;
7778
this._config = config;
7879
this._log = Logger.create(config);
7980
this._pool = new Pool(
@@ -127,18 +128,24 @@ class Driver {
127128
}
128129

129130
/**
130-
* Create a new connection instance.
131-
* @return {Connection} new connector-api session instance, a low level session API.
131+
* Create a new connection and initialize it.
132+
* @return {Promise<Connection>} promise resolved with a new connection or rejected when failed to connect.
132133
* @access private
133134
*/
134135
_createConnection(hostPort, release) {
135-
const conn = connect(hostPort, this._config, this._connectionErrorCode(), this._log);
136-
const streamObserver = new _ConnectionStreamObserver(this, conn);
137-
conn.protocol().initialize(this._userAgent, this._token, streamObserver);
138-
conn._release = () => release(hostPort, conn);
139-
140-
this._openConnections[conn.id] = conn;
141-
return conn;
136+
const connection = Connection.create(hostPort, this._config, this._createConnectionErrorHandler(), this._log);
137+
connection._release = () => release(hostPort, connection);
138+
this._openConnections[connection.id] = connection;
139+
140+
return connection.connect(this._userAgent, this._authToken)
141+
.catch(error => {
142+
if (this.onError) {
143+
// notify Driver.onError callback about connection initialization errors
144+
this.onError(error);
145+
}
146+
// propagate the error because connection failed to connect / initialize
147+
throw error;
148+
});
142149
}
143150

144151
/**
@@ -186,7 +193,7 @@ class Driver {
186193
const sessionMode = Driver._validateSessionMode(mode);
187194
const connectionProvider = this._getOrCreateConnectionProvider();
188195
const bookmark = new Bookmark(bookmarkOrBookmarks);
189-
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
196+
return new Session(sessionMode, connectionProvider, bookmark, this._config);
190197
}
191198

192199
static _validateSessionMode(rawMode) {
@@ -203,14 +210,8 @@ class Driver {
203210
}
204211

205212
// Extension point
206-
_createSession(mode, connectionProvider, bookmark, config) {
207-
return new Session(mode, connectionProvider, bookmark, config);
208-
}
209-
210-
// Extension point
211-
_connectionErrorCode() {
212-
// connection errors might result in different error codes depending on the driver
213-
return SERVICE_UNAVAILABLE;
213+
_createConnectionErrorHandler() {
214+
return new ConnectionErrorHandler(SERVICE_UNAVAILABLE);
214215
}
215216

216217
_getOrCreateConnectionProvider() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
21+
22+
export default class ConnectionErrorHandler {
23+
24+
constructor(errorCode, handleUnavailability, handleWriteFailure) {
25+
this._errorCode = errorCode;
26+
this._handleUnavailability = handleUnavailability || noOpHandler;
27+
this._handleWriteFailure = handleWriteFailure || noOpHandler;
28+
}
29+
30+
/**
31+
* Error code to use for network errors.
32+
* @return {string} the error code.
33+
*/
34+
errorCode() {
35+
return this._errorCode;
36+
}
37+
38+
/**
39+
* Handle and transform the error.
40+
* @param {Neo4jError} error the original error.
41+
* @param {string} hostPort the host and port of the connection where the error happened.
42+
* @return {Neo4jError} new error that should be propagated to the user.
43+
*/
44+
handleAndTransformError(error, hostPort) {
45+
if (isAvailabilityError(error)) {
46+
return this._handleUnavailability(error, hostPort);
47+
}
48+
if (isFailureToWrite(error)) {
49+
return this._handleWriteFailure(error, hostPort);
50+
}
51+
return error;
52+
}
53+
}
54+
55+
function isAvailabilityError(error) {
56+
if (error) {
57+
return error.code === SESSION_EXPIRED ||
58+
error.code === SERVICE_UNAVAILABLE ||
59+
error.code === 'Neo.TransientError.General.DatabaseUnavailable';
60+
}
61+
return false;
62+
}
63+
64+
function isFailureToWrite(error) {
65+
if (error) {
66+
return error.code === 'Neo.ClientError.Cluster.NotALeader' ||
67+
error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase';
68+
}
69+
return false;
70+
}
71+
72+
function noOpHandler(error) {
73+
return error;
74+
}

src/v1/internal/connection-holder.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export default class ConnectionHolder {
5555
getConnection(streamObserver) {
5656
return this._connectionPromise.then(connection => {
5757
streamObserver.resolveConnection(connection);
58-
return connection.initializationCompleted();
58+
return connection;
5959
});
6060
}
6161

src/v1/internal/connection-providers.js

+27-13
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import hasFeature from './features';
2626
import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers';
2727
import RoutingUtil from './routing-util';
2828

29+
const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized';
30+
2931
class ConnectionProvider {
3032

3133
acquireConnection(mode) {
@@ -195,20 +197,32 @@ export class LoadBalancer extends ConnectionProvider {
195197

196198
// try next router
197199
return this._createSessionForRediscovery(currentRouter).then(session => {
198-
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter)
200+
if (session) {
201+
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
202+
} else {
203+
// unable to acquire connection and create session towards the current router
204+
// return null to signal that the next router should be tried
205+
return null;
206+
}
199207
});
200208
});
201209
}, Promise.resolve(null));
202210
}
203211

204212
_createSessionForRediscovery(routerAddress) {
205-
return this._connectionPool.acquire(routerAddress).then(connection => {
206-
// initialized connection is required for routing procedure call
207-
// server version needs to be known to decide which routing procedure to use
208-
const initializedConnectionPromise = connection.initializationCompleted();
209-
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
210-
return new Session(READ, connectionProvider);
211-
});
213+
return this._connectionPool.acquire(routerAddress)
214+
.then(connection => {
215+
const connectionProvider = new SingleConnectionProvider(connection);
216+
return new Session(READ, connectionProvider);
217+
})
218+
.catch(error => {
219+
// unable to acquire connection towards the given router
220+
if (error && error.code === UNAUTHORIZED_ERROR_CODE) {
221+
// auth error is a sign of a configuration issue, rediscovery should not proceed
222+
throw error;
223+
}
224+
return null;
225+
});
212226
}
213227

214228
_applyRoutingTableIfPossible(newRoutingTable) {
@@ -257,14 +271,14 @@ export class LoadBalancer extends ConnectionProvider {
257271

258272
export class SingleConnectionProvider extends ConnectionProvider {
259273

260-
constructor(connectionPromise) {
274+
constructor(connection) {
261275
super();
262-
this._connectionPromise = connectionPromise;
276+
this._connection = connection;
263277
}
264278

265279
acquireConnection(mode) {
266-
const connectionPromise = this._connectionPromise;
267-
this._connectionPromise = null;
268-
return connectionPromise;
280+
const connection = this._connection;
281+
this._connection = null;
282+
return Promise.resolve(connection);
269283
}
270284
}

0 commit comments

Comments
 (0)