diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-direct.js b/packages/bolt-connection/src/connection-provider/connection-provider-direct.js index 1fa74947a..7822cded6 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-direct.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-direct.js @@ -24,6 +24,7 @@ import { ConnectionErrorHandler } from '../connection' import { internal, error } from 'neo4j-driver-core' +import { controller } from '../lang' const { constants: { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V4_4 } @@ -49,12 +50,17 @@ export default class DirectConnectionProvider extends PooledConnectionProvider { this._handleAuthorizationExpired(error, address, database) }) - return this._connectionPool - .acquire(this._address) - .then( - connection => - new DelegateConnection(connection, databaseSpecificErrorHandler) - ) + const acquireConnectionJob = { + run: () => this._connectionPool + .acquire(this._address) + .then( + connection => + new DelegateConnection(connection, databaseSpecificErrorHandler) + ), + onTimeout: connection => connection._release() + } + + return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, acquireConnectionJob) } _handleAuthorizationExpired (error, address, database) { diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index 8870566b0..c92520859 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -19,7 +19,7 @@ import { createChannelConnection, ConnectionErrorHandler } from '../connection' import Pool, { PoolConfig } from '../pool' -import { error, ConnectionProvider } from 'neo4j-driver-core' +import { error, newError, ConnectionProvider } from 'neo4j-driver-core' const { SERVICE_UNAVAILABLE } = error export default class PooledConnectionProvider extends ConnectionProvider { @@ -58,6 +58,12 @@ export default class PooledConnectionProvider extends ConnectionProvider { log: this._log }) this._openConnections = {} + this._sessionConnectionTimeoutConfig = { + timeout: this._config.sessionConnectionTimeout, + reason: () => newError( + `Session acquisition timed out in ${this._config.sessionConnectionTimeout} ms.` + ) + } } _createConnectionErrorHandler () { diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index 0dbe88b26..561b5165d 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -23,6 +23,7 @@ import { HostNameResolver } from '../channel' import SingleConnectionProvider from './connection-provider-single' import PooledConnectionProvider from './connection-provider-pooled' import { LeastConnectedLoadBalancingStrategy } from '../load-balancing' +import { controller } from '../lang' import { createChannelConnection, ConnectionErrorHandler, @@ -70,6 +71,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ) }) + this._updateRoutingTableTimeoutConfig = { + timeout: this._config.updateRoutingTableTimeout, + reason: () => newError( + `Routing table update timed out in ${this._config.updateRoutingTableTimeout} ms.` + ) + } + this._routingContext = { ...routingContext, address: address.toString() } this._seedRouter = address this._rediscovery = new Rediscovery(this._routingContext) @@ -137,53 +145,66 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._handleAuthorizationExpired(error, address, context.database) ) - const routingTable = await this._freshRoutingTable({ - accessMode, - database: context.database, - bookmark: bookmarks, - impersonatedUser, - onDatabaseNameResolved: (databaseName) => { - context.database = context.database || databaseName - if (onDatabaseNameResolved) { - onDatabaseNameResolved(databaseName) - } - } - }) + const refreshRoutingTableJob = { + run: async (_, cancelationToken) => { + const routingTable = await this._freshRoutingTable({ + accessMode, + database: context.database, + bookmarks: bookmarks, + impersonatedUser, + onDatabaseNameResolved: (databaseName) => { + context.database = context.database || databaseName + if (onDatabaseNameResolved) { + onDatabaseNameResolved(databaseName) + } + }, + cancelationToken + }) - // select a target server based on specified access mode - if (accessMode === READ) { - address = this._loadBalancingStrategy.selectReader(routingTable.readers) - name = 'read' - } else if (accessMode === WRITE) { - address = this._loadBalancingStrategy.selectWriter(routingTable.writers) - name = 'write' - } else { - throw newError('Illegal mode ' + accessMode) - } + // select a target server based on specified access mode + if (accessMode === READ) { + address = this._loadBalancingStrategy.selectReader(routingTable.readers) + name = 'read' + } else if (accessMode === WRITE) { + address = this._loadBalancingStrategy.selectWriter(routingTable.writers) + name = 'write' + } else { + throw newError('Illegal mode ' + accessMode) + } - // we couldn't select a target server - if (!address) { - throw newError( - `Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`, - SESSION_EXPIRED - ) + // we couldn't select a target server + if (!address) { + throw newError( + `Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`, + SESSION_EXPIRED + ) + } + return { routingTable, address } + } } - try { - const connection = await this._acquireConnectionToServer( - address, - name, - routingTable - ) + const acquireConnectionJob = { + run: async ({ routingTable, address }) => { + try { + const connection = await this._acquireConnectionToServer( + address, + name, + routingTable + ) - return new DelegateConnection(connection, databaseSpecificErrorHandler) - } catch (error) { - const transformed = databaseSpecificErrorHandler.handleAndTransformError( - error, - address - ) - throw transformed + return new DelegateConnection(connection, databaseSpecificErrorHandler) + } catch (error) { + const transformed = databaseSpecificErrorHandler.handleAndTransformError( + error, + address + ) + throw transformed + } + }, + onTimeout: connection => connection._release() } + + return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, refreshRoutingTableJob, acquireConnectionJob) } async _hasProtocolVersion (versionPredicate) { @@ -259,48 +280,57 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return this._connectionPool.acquire(address) } - _freshRoutingTable ({ accessMode, database, bookmark, impersonatedUser, onDatabaseNameResolved } = {}) { - const currentRoutingTable = this._routingTableRegistry.get( - database, - () => new RoutingTable({ database }) - ) + _freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken = new controller.CancelationToken(() => false) } = {}) { + const refreshRoutingTableJob = { + run: (_, refreshCancelationToken) => { + const combinedCancelationToken = refreshCancelationToken.combine(cancelationToken) + const currentRoutingTable = this._routingTableRegistry.get( + database, + () => new RoutingTable({ database }) + ) - if (!currentRoutingTable.isStaleFor(accessMode)) { - return currentRoutingTable + if (!currentRoutingTable.isStaleFor(accessMode)) { + return currentRoutingTable + } + this._log.info( + `Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}` + ) + return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, combinedCancelationToken) + } } - this._log.info( - `Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}` - ) - return this._refreshRoutingTable(currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved) + return controller.runWithTimeout(this._updateRoutingTableTimeoutConfig, refreshRoutingTableJob) } - _refreshRoutingTable (currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved) { + _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken) { const knownRouters = currentRoutingTable.routers if (this._useSeedRouter) { return this._fetchRoutingTableFromSeedRouterFallbackToKnownRouters( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser, - onDatabaseNameResolved + onDatabaseNameResolved, + cancelationToken ) } return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser, - onDatabaseNameResolved + onDatabaseNameResolved, + cancelationToken ) } async _fetchRoutingTableFromSeedRouterFallbackToKnownRouters ( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser, - onDatabaseNameResolved + onDatabaseNameResolved, + cancelationToken ) { // we start with seed router, no routers were probed before const seenRouters = [] @@ -308,8 +338,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider seenRouters, this._seedRouter, currentRoutingTable, - bookmark, - impersonatedUser + bookmarks, + impersonatedUser, + cancelationToken ) if (newRoutingTable) { @@ -319,8 +350,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider newRoutingTable = await this._fetchRoutingTableUsingKnownRouters( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser + bookmarks, + impersonatedUser, + cancelationToken ) } @@ -334,15 +366,17 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider async _fetchRoutingTableFromKnownRoutersFallbackToSeedRouter ( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser, - onDatabaseNameResolved + onDatabaseNameResolved, + cancelationToken ) { let newRoutingTable = await this._fetchRoutingTableUsingKnownRouters( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser + bookmarks, + impersonatedUser, + cancelationToken ) if (!newRoutingTable) { @@ -351,8 +385,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider knownRouters, this._seedRouter, currentRoutingTable, - bookmark, - impersonatedUser + bookmarks, + impersonatedUser, + cancelationToken ) } @@ -366,14 +401,16 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider async _fetchRoutingTableUsingKnownRouters ( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser + bookmarks, + impersonatedUser, + cancelationToken ) { const newRoutingTable = await this._fetchRoutingTable( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser + bookmarks, + impersonatedUser, + cancelationToken ) if (newRoutingTable) { @@ -397,17 +434,20 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider seenRouters, seedRouter, routingTable, - bookmark, - impersonatedUser + bookmarks, + impersonatedUser, + cancelationToken ) { const resolvedAddresses = await this._resolveSeedRouter(seedRouter) + cancelationToken.throwIfCancellationRequested() + // filter out all addresses that we've already tried const newAddresses = resolvedAddresses.filter( address => seenRouters.indexOf(address) < 0 ) - return await this._fetchRoutingTable(newAddresses, routingTable, bookmark, impersonatedUser) + return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser, cancelationToken) } async _resolveSeedRouter (seedRouter) { @@ -419,7 +459,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return [].concat.apply([], dnsResolvedAddresses) } - _fetchRoutingTable (routerAddresses, routingTable, bookmark, impersonatedUser) { + async _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser, cancelationToken) { return routerAddresses.reduce( async (refreshedTablePromise, currentRouter, currentIndex) => { const newRoutingTable = await refreshedTablePromise @@ -441,7 +481,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider // try next router const session = await this._createSessionForRediscovery( currentRouter, - bookmark, + bookmarks, impersonatedUser ) if (session) { @@ -453,6 +493,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider impersonatedUser ) } catch (error) { + cancelationToken.throwIfCancellationRequested() if (error && error.code === DATABASE_NOT_FOUND_ERROR_CODE) { // not finding the target database is a sign of a configuration issue throw error @@ -462,9 +503,10 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ) return null } finally { - session.close() + await session.close() } } else { + cancelationToken.throwIfCancellationRequested() // unable to acquire connection and create session towards the current router // return null to signal that the next router should be tried return null diff --git a/packages/bolt-connection/src/lang/controller.js b/packages/bolt-connection/src/lang/controller.js new file mode 100644 index 000000000..63b3fcad0 --- /dev/null +++ b/packages/bolt-connection/src/lang/controller.js @@ -0,0 +1,125 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The cancel operation error + */ +export class OperationCanceledError extends Error { + constructor () { + super() + this.name = OperationCanceledError.name + } +} + +/** + * The CancelationToken used by `runWithTimeout` for cancel an working job. + */ +export class CancelationToken { + constructor (getCancelationRequested) { + this._getCancelationRequested = getCancelationRequested + Object.freeze(this) + } + + /** + * If it receive a cancelation request + */ + get isCancelationRequested () { + return this._getCancelationRequested() + } + + /** + * Combine two cancelations token in one. + * + * The new cancelation token will be canceled if one of the + * token get canceled. + * + * @param {CancelationToken} cancelationToken The other cancelation token + * @returns {CancelationToken} Combined cancelation toke + */ + combine (cancelationToken) { + return new CancelationToken(() => + this.isCancelationRequested === true || cancelationToken.isCancelationRequested === true) + } + + /** + * + * @param {Error} [error] the error to be thrown. Be default: OperationCanceledError will be thrown + * @throws {OperationCanceledError|Error} if a cancelation request was done + * @returns {void} + */ + throwIfCancellationRequested (error) { + if (this.isCancelationRequested) { + if (error != null) { + throw error + } + throw new OperationCanceledError() + } + } +} + +/** + * @typedef {Object} Job + * @property {function(any, CancelationToken)} run method called for run the job + * @property {function(any)} [onTimeout] method called after job finished and the controller has timeout. + * Useful for cleanups. + */ +/** + * @param {any} param0 + * @param {number} param0.timeout The timeout time + * @param {Error} param0.reason The reason for the timeout + * @param {...Job} jobs The jobs to be run in sequence + * @returns {Promise} The result of all the jobs or a timeout failure + */ +export function runWithTimeout ({ timeout, reason }, ...jobs) { + const status = { timedout: false } + const cancelationToken = new CancelationToken(() => status.timedout) + async function _run (currentValue, { resolve, reject }, myJobs) { + const [{ run, onTimeout = () => Promise.resolve() }, ...otherJobs] = myJobs + try { + const value = await run(currentValue, cancelationToken) + if (status.timedout) { + await onTimeout(value).catch(() => {}) + } else if (otherJobs.length === 0) { + resolve(value) + } else { + await _run(value, { resolve, reject }, otherJobs) + } + } catch (e) { + if (!status.timedout) { + reject(e) + } + } + } + + return new Promise((resolve, reject) => { + if (timeout != null) { + status.timeoutHandle = setTimeout(() => { + status.timedout = true + reject(reason()) + }, timeout) + } + + _run(undefined, { resolve, reject }, jobs) + .finally(() => { + if (status.timeoutHandle != null) { + clearTimeout(status.timeoutHandle) + } + }) + }) +} diff --git a/packages/bolt-connection/src/lang/index.js b/packages/bolt-connection/src/lang/index.js new file mode 100644 index 000000000..41ec4ac4b --- /dev/null +++ b/packages/bolt-connection/src/lang/index.js @@ -0,0 +1,20 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export * as controller from './controller' diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index f2b33546a..2eadbc40c 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -21,6 +21,7 @@ import DirectConnectionProvider from '../../src/connection-provider/connection-p import { Pool } from '../../src/pool' import { Connection, DelegateConnection } from '../../src/connection' import { internal, newError } from 'neo4j-driver-core' +import testUtils from '../test-utils' const { serverAddress: { ServerAddress }, @@ -96,53 +97,115 @@ describe('#unit DirectConnectionProvider', () => { expect(error).toBe(expectedError) }) -}) - -it('should purge connections for address when TokenExpired happens', async () => { - const address = ServerAddress.fromUrl('localhost:123') - const pool = newPool() - jest.spyOn(pool, 'purge') - const connectionProvider = newDirectConnectionProvider(address, pool) - const conn = await connectionProvider.acquireConnection({ - accessMode: 'READ', - database: '' + it('should purge connections for address when TokenExpired happens', async () => { + const address = ServerAddress.fromUrl('localhost:123') + const pool = newPool() + jest.spyOn(pool, 'purge') + const connectionProvider = newDirectConnectionProvider(address, pool) + + const conn = await connectionProvider.acquireConnection({ + accessMode: 'READ', + database: '' + }) + + const error = newError( + 'Message', + 'Neo.ClientError.Security.TokenExpired' + ) + + conn.handleAndTransformError(error, address) + + expect(pool.purge).toHaveBeenCalledWith(address) }) - - const error = newError( - 'Message', - 'Neo.ClientError.Security.TokenExpired' - ) - - conn.handleAndTransformError(error, address) - - expect(pool.purge).toHaveBeenCalledWith(address) -}) - -it('should not change error when TokenExpired happens', async () => { - const address = ServerAddress.fromUrl('localhost:123') - const pool = newPool() - const connectionProvider = newDirectConnectionProvider(address, pool) - - const conn = await connectionProvider.acquireConnection({ - accessMode: 'READ', - database: '' + + it('should not change error when TokenExpired happens', async () => { + const address = ServerAddress.fromUrl('localhost:123') + const pool = newPool() + const connectionProvider = newDirectConnectionProvider(address, pool) + + const conn = await connectionProvider.acquireConnection({ + accessMode: 'READ', + database: '' + }) + + const expectedError = newError( + 'Message', + 'Neo.ClientError.Security.TokenExpired' + ) + + const error = conn.handleAndTransformError(expectedError, address) + + expect(error).toBe(expectedError) + }) + + describe('config.sessionConnectionTimeout', () => { + describe('when connection is acquired in time', () => { + let connectionPromise + let address + let pool + + beforeEach(() => { + address = ServerAddress.fromUrl('localhost:123') + pool = newPool() + const connectionProvider = newDirectConnectionProvider(address, pool, { + sessionConnectionTimeout: 120000 + }) + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: '' }) + }) + + it('should resolve with the connection', async () => { + const connection = await connectionPromise + + expect(connection).toBeDefined() + expect(connection.address).toEqual(address) + expect(pool.has(address)).toBeTruthy() + }) + }) + + describe('when connection is not acquired in time', () => { + let connectionPromise + let address + let pool + let seenConnections + + beforeEach(() => { + seenConnections = [] + address = ServerAddress.fromUrl('localhost:123') + pool = newPool({ delay: 1000, seenConnections }) + const connectionProvider = newDirectConnectionProvider(address, pool, { + sessionConnectionTimeout: 500 + }) + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: '' }) + }) + + it('should reject with Session acquisition timeout error', async () => { + await expect(connectionPromise).rejects.toThrowError(newError( + 'Session acquisition timed out in 500 ms.' + )) + }) + + it('should return the connection back to the pool', async () => { + await expect(connectionPromise).rejects.toThrow() + // wait for connection be released to the pool + await testUtils.wait(600) + + expect(seenConnections.length).toBe(1) + expect(seenConnections[0]._release).toBeCalledTimes(1) + expect(pool.has(address)).toBe(true) + }) + }) }) - - const expectedError = newError( - 'Message', - 'Neo.ClientError.Security.TokenExpired' - ) - - const error = conn.handleAndTransformError(expectedError, address) - - expect(error).toBe(expectedError) }) -function newDirectConnectionProvider (address, pool) { +function newDirectConnectionProvider (address, pool, config) { const connectionProvider = new DirectConnectionProvider({ id: 0, - config: {}, + config: { ...config }, log: Logger.noOp(), address: address }) @@ -150,10 +213,18 @@ function newDirectConnectionProvider (address, pool) { return connectionProvider } -function newPool () { +function newPool ({ create, config, delay, seenConnections = [] } = {}) { + const _create = (address, release) => { + const connection = create != null ? create(address, release) : new FakeConnection(address, release) + seenConnections.push(connection) + return connection + } return new Pool({ - create: (address, release) => - Promise.resolve(new FakeConnection(address, release)) + config, + create: async (address, release) => { + await testUtils.wait(delay) + return _create(address, release) + } }) } @@ -163,6 +234,7 @@ class FakeConnection extends Connection { this._address = address this.release = release + this._release = jest.fn(() => release(address, this)) } get address () { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index 509f5c8e8..d6150ef53 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -30,6 +30,7 @@ import { Pool } from '../../src/pool' import SimpleHostNameResolver from '../../src/channel/browser/browser-host-name-resolver' import RoutingConnectionProvider from '../../src/connection-provider/connection-provider-routing' import { DelegateConnection, Connection } from '../../src/connection' +import testUtils from '../test-utils' const { serverAddress: { ServerAddress }, @@ -39,6 +40,8 @@ const { const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error const READ = 'READ' const WRITE = 'WRITE' +const defaultSeedRouter = ServerAddress.fromUrl('server-non-existing-seed-router') + describe('#unit RoutingConnectionProvider', () => { const server0 = ServerAddress.fromUrl('server0') @@ -69,8 +72,8 @@ describe('#unit RoutingConnectionProvider', () => { const serverDD = ServerAddress.fromUrl('serverDD') const serverEE = ServerAddress.fromUrl('serverEE') - const serverABC = ServerAddress.fromUrl('serverABC') - + const serverABC = ServerAddress.fromUrl('serverABC') + const usersDataSet = [ [null], [undefined], @@ -2472,22 +2475,481 @@ describe('#unit RoutingConnectionProvider', () => { expect(onDatabaseNameResolved).toHaveBeenCalledWith('databaseA') }) + }) + + describe('config.sessionConnectionTimeout', () => { + describe('when connection is acquired in time', () => { + let connectionPromise + let address + let pool + + beforeEach(() => { + address = server3 + const routingTable = newRoutingTable( + null, + [server1], + [server3], + [server5] + ) + pool = newPool() + const routerToRoutingTable = { + null: routingTable + } + const rediscovery = new FakeRediscovery({ + null: { + // Seed Router + [defaultSeedRouter.asKey()]: routingTable + } + }) + + const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( + rediscovery, + pool, + routerToRoutingTable, + { + sessionConnectionTimeout: 500 + } + ) + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: '' }) + }) + + it('should resolve with the connection', async () => { + const connection = await connectionPromise + + expect(connection).toBeDefined() + expect(connection.address).toEqual(address) + expect(pool.has(address)).toBeTruthy() + }) + }) + + describe('when connection is not acquired in time', () => { + let connectionPromise + let address + let pool + let seenConnections + + beforeEach(() => { + seenConnections = [] + address = server3 + const routingTable = newRoutingTable( + null, + [server1], + [server3], + [server5] + ) + pool = newPool({ delay: 350, seenConnections }) + const routerToRoutingTable = { + null: routingTable + } + const rediscovery = new FakeRediscovery({ + null: { + // Seed Router + [defaultSeedRouter.asKey()]: routingTable + } + }) + + const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( + rediscovery, + pool, + routerToRoutingTable, + { + sessionConnectionTimeout: 600 + } + ) + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: '' }) + }) + + it('should reject with Session acquisition timeout error', async () => { + await expect(connectionPromise).rejects.toThrowError(newError( + 'Session acquisition timed out in 600 ms.' + )) + }) + + it('should return the connection back to the pool', async () => { + await expect(connectionPromise).rejects.toThrow() + // wait for connection be released to the pool + await testUtils.wait(400) + + expect(pool.has(address)).toBe(true) + const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === address) + expect(seenConnectionsToAddress.length).toBe(1) + expect(seenConnectionsToAddress[0]._release).toHaveBeenCalledTimes(1) + }) + }) + + describe('when rediscovery is not done in time', () => { + let connectionPromise + let address + let pool + let seenConnections + + beforeEach(() => { + seenConnections = [] + address = server3 + const routingTable = newRoutingTable( + null, + [server1], + [server3], + [server5] + ) + pool = newPool({ seenConnections }) + const routerToRoutingTable = { + null: routingTable + } + const rediscovery = new FakeRediscovery({ + null: { + // Seed Router + [defaultSeedRouter.asKey()]: routingTable + } + }, null, 500) + + const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( + rediscovery, + pool, + routerToRoutingTable, + { + sessionConnectionTimeout: 300 + } + ) + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: '' }) + }) + + it('should reject with Session acquisition timeout error', async () => { + await expect(connectionPromise).rejects.toThrowError(newError( + 'Session acquisition timed out in 300 ms.' + )) + }) + + it('should acquire connection the default router', async () => { + await expect(connectionPromise).rejects.toThrow() + // wait for connection be released to the pool + await testUtils.wait(400) + + const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === defaultSeedRouter) + expect(seenConnectionsToAddress.length).toBe(1) + }) + + it('should not acquire connection to reader', async () => { + await expect(connectionPromise).rejects.toThrow() + // wait for connection be released to the pool + await testUtils.wait(400) + + expect(pool.has(address)).toBe(false) + const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === address) + expect(seenConnectionsToAddress.length).toBe(0) + }) + }) + + describe('when rediscovery is not done in time and router fails', () => { + let connectionPromise + let address + let pool + let seenConnections + + beforeEach(() => { + seenConnections = [] + address = server3 + const routingTable = newRoutingTable( + 'aDatabase', + [server1, server2], // routers + [server3], + [server5], + int(0) // expired + ) + pool = newPool({ seenConnections }) + const routerToRoutingTable = { + aDatabase: routingTable + } + const rediscovery = new FakeRediscovery({ + aDatabase: { + // Seed Router + [defaultSeedRouter.asKey()]: routingTable + } + }, newError('It is a non fast-failing error'), 500) + + const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( + rediscovery, + pool, + routerToRoutingTable, + { + sessionConnectionTimeout: 300 + }, + [routingTable] + ) + + // Force not use seed router + connectionProvider._useSeedRouter = false + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: 'aDatabase' }) + }) + + it('should reject with Session acquisition timeout error', async () => { + await expect(connectionPromise).rejects.toThrowError(newError( + 'Session acquisition timed out in 300 ms.' + )) + }) + + it('should acquire connection the router1', async () => { + await expect(connectionPromise).rejects.toThrow() + // Wait for connection to server 1 be created + await testUtils.wait(300) + + const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === server1) + expect(seenConnectionsToAddress.length).toBe(1) + }) + + it('should not acquire connection the router2', async () => { + await expect(connectionPromise).rejects.toThrow() + // wait for connection server 2 be created + await testUtils.wait(800) + + expect(pool.has(address)).toBe(false) + const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === server2) + expect(seenConnectionsToAddress.length).toBe(0) + }) + }) + }) + + describe('config.updateRoutingTableTimeout', () => { + describe('when routing table is refreshed in time', () => { + let connectionPromise + let address + let pool + + beforeEach(() => { + address = server3 + const routingTable = newRoutingTable( + null, + [server1], + [server3], + [server5] + ) + pool = newPool() + const routerToRoutingTable = { + null: routingTable + } + const rediscovery = new FakeRediscovery({ + null: { + // Seed Router + [defaultSeedRouter.asKey()]: routingTable + } + }) + + const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( + rediscovery, + pool, + routerToRoutingTable, + { + updateRoutingTableTimeout: 500 + } + ) + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: '' }) + }) + + it('should resolve with the connection', async () => { + const connection = await connectionPromise + + expect(connection).toBeDefined() + expect(connection.address).toEqual(address) + expect(pool.has(address)).toBeTruthy() + }) + }) + + describe('when routing connection is not acquired in time', () => { + let connectionPromise + let pool + let seenConnections + + beforeEach(() => { + seenConnections = [] + const routingTable = newRoutingTable( + null, + [server1], + [server3], + [server5] + ) + pool = newPool({ delay: 350, seenConnections }) + const routerToRoutingTable = { + null: routingTable + } + const rediscovery = new FakeRediscovery({ + null: { + // Seed Router + [defaultSeedRouter.asKey()]: routingTable + } + }) + + const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( + rediscovery, + pool, + routerToRoutingTable, + { + updateRoutingTableTimeout: 300 + } + ) + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: '' }) + }) + + it('should reject with routing table update timeout error', async () => { + await expect(connectionPromise).rejects.toThrowError(newError( + 'Routing table update timed out in 300 ms.' + )) + }) + }) + + describe('when rediscovery is not done in time', () => { + let connectionPromise + let pool + let seenConnections + + beforeEach(() => { + seenConnections = [] + const routingTable = newRoutingTable( + null, + [server1], + [server3], + [server5] + ) + pool = newPool({ seenConnections }) + const routerToRoutingTable = { + null: routingTable + } + const rediscovery = new FakeRediscovery({ + null: { + // Seed Router + [defaultSeedRouter.asKey()]: routingTable + } + }, null, 500) + + const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( + rediscovery, + pool, + routerToRoutingTable, + { + updateRoutingTableTimeout: 300 + } + ) + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: '' }) + }) + + it('should reject with routing table update timeout error', async () => { + await expect(connectionPromise).rejects.toThrowError(newError( + 'Routing table update timed out in 300 ms.' + )) + }) + + it('should release the connection', async () => { + await expect(connectionPromise).rejects.toThrow() + // wait for connection be released + await testUtils.wait(400) + + const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === defaultSeedRouter) + expect(seenConnectionsToAddress.length).toBe(1) + }) + }) + + describe('when rediscovery is not done in time and router fails', () => { + let connectionPromise + let address + let pool + let seenConnections + + beforeEach(() => { + seenConnections = [] + address = server3 + const routingTable = newRoutingTable( + 'aDatabase', + [server1, server2], // routers + [server3], + [server5], + int(0) // expired + ) + pool = newPool({ seenConnections }) + const routerToRoutingTable = { + aDatabase: routingTable + } + const rediscovery = new FakeRediscovery({ + aDatabase: { + // Seed Router + [defaultSeedRouter.asKey()]: routingTable + } + }, newError('It is a non fast-failing error'), 500) + + const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( + rediscovery, + pool, + routerToRoutingTable, + { + updateRoutingTableTimeout: 300 + }, + [routingTable] + ) + + // Force not use seed router + connectionProvider._useSeedRouter = false + + connectionPromise = connectionProvider + .acquireConnection({ accessMode: 'READ', database: 'aDatabase' }) + }) + + it('should reject with routing table update timeout error', async () => { + await expect(connectionPromise).rejects.toThrowError(newError( + 'Routing table update timed out in 300 ms.' + )) + }) + + it('should acquire connection the router1', async () => { + await expect(connectionPromise).rejects.toThrow() + // Wait for connection to server 1 be created + await testUtils.wait(300) + + const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === server1) + expect(seenConnectionsToAddress.length).toBe(1) + }) + + it('should not acquire connection the router2', async () => { + await expect(connectionPromise).rejects.toThrow() + // wait for connection server 2 be created + await testUtils.wait(800) + expect(pool.has(address)).toBe(false) + const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === server2) + expect(seenConnectionsToAddress.length).toBe(0) + }) + }) }) }) -function newRoutingConnectionProvider ( - routingTables, +function newRoutingConnectionProviderWithFakeRediscovery ( + fakeRediscovery, pool = null, - routerToRoutingTable = { null: {} } + routerToRoutingTable = { null: {} }, + config = {}, + routingTables = [] ) { - const seedRouter = ServerAddress.fromUrl('server-non-existing-seed-router') + const seedRouter = defaultSeedRouter return newRoutingConnectionProviderWithSeedRouter( seedRouter, [seedRouter], routingTables, routerToRoutingTable, - pool + pool, + null, + fakeRediscovery, + config ) } @@ -2497,7 +2959,9 @@ function newRoutingConnectionProviderWithSeedRouter ( routingTables, routerToRoutingTable = { null: {} }, connectionPool = null, - routingTablePurgeDelay = null + routingTablePurgeDelay = null, + fakeRediscovery = null, + config = {} ) { const pool = connectionPool || newPool() const connectionProvider = new RoutingConnectionProvider({ @@ -2505,7 +2969,7 @@ function newRoutingConnectionProviderWithSeedRouter ( address: seedRouter, routingContext: {}, hostNameResolver: new SimpleHostNameResolver(), - config: {}, + config: config, log: Logger.noOp(), routingTablePurgeDelay: routingTablePurgeDelay }) @@ -2513,7 +2977,8 @@ function newRoutingConnectionProviderWithSeedRouter ( routingTables.forEach(r => { connectionProvider._routingTableRegistry.register(r) }) - connectionProvider._rediscovery = new FakeRediscovery(routerToRoutingTable) + connectionProvider._rediscovery = + fakeRediscovery || new FakeRediscovery(routerToRoutingTable) connectionProvider._hostNameResolver = new FakeDnsResolver(seedRouterResolved) connectionProvider._useSeedRouter = routingTables.every( r => r.expirationTime !== Integer.ZERO @@ -2521,6 +2986,21 @@ function newRoutingConnectionProviderWithSeedRouter ( return connectionProvider } +function newRoutingConnectionProvider ( + routingTables, + pool = null, + routerToRoutingTable = { null: {} } +) { + const seedRouter = defaultSeedRouter + return newRoutingConnectionProviderWithSeedRouter( + seedRouter, + [seedRouter], + routingTables, + routerToRoutingTable, + pool + ) +} + function newRoutingTableWithUser ({ database, routers, @@ -2560,17 +3040,25 @@ function setupRoutingConnectionProviderToRememberRouters ( const originalFetch = connectionProvider._fetchRoutingTable.bind( connectionProvider ) - const rememberingFetch = (routerAddresses, routingTable, bookmark) => { + const rememberingFetch = (routerAddresses, ...rest) => { routersArray.push(routerAddresses) - return originalFetch(routerAddresses, routingTable, bookmark) + return originalFetch(...[routerAddresses, ...rest]) } connectionProvider._fetchRoutingTable = rememberingFetch } -function newPool () { +function newPool ({ create, config, delay, seenConnections = [] } = {}) { + const _create = async (address, release) => { + await testUtils.wait(delay) + const connection = create != null + ? create(address, release) + : new FakeConnection(address, release, 'version', 4.0) + seenConnections.push(connection) + return connection + } return new Pool({ - create: (address, release) => - Promise.resolve(new FakeConnection(address, release, 'version', 4.0)) + config, + create: (address, release) => _create(address, release) }) } @@ -2612,6 +3100,7 @@ class FakeConnection extends Connection { this._version = version || VERSION_IN_DEV.toString() this._protocolVersion = protocolVersion this.release = release + this._release = jest.fn(() => release(address, this)) } get address () { @@ -2630,11 +3119,17 @@ class FakeConnection extends Connection { } class FakeRediscovery { - constructor (routerToRoutingTable) { + constructor (routerToRoutingTable, error, delay) { this._routerToRoutingTable = routerToRoutingTable + this._error = error + this._delay = delay } - lookupRoutingTableOnRouter (ignored, database, router, user) { + async lookupRoutingTableOnRouter (ignored, database, router, user) { + await testUtils.wait(this._delay) + if (this._error) { + throw this._error + } const table = this._routerToRoutingTable[database || null] if (table) { let routingTables = table[router.asKey()] @@ -2642,9 +3137,9 @@ class FakeRediscovery { if (routingTables instanceof Array) { routingTable = routingTables.find(rt => rt.user === user) } - return Promise.resolve(routingTable) + return routingTable } - return Promise.resolve(null) + return null } } diff --git a/packages/bolt-connection/test/test-utils.js b/packages/bolt-connection/test/test-utils.js index b9e112e03..40941fbbc 100644 --- a/packages/bolt-connection/test/test-utils.js +++ b/packages/bolt-connection/test/test-utils.js @@ -134,11 +134,16 @@ function spyProtocolWrite (protocol, callRealMethod = false) { return protocol } +function wait (time) { + return new Promise((resolve) => setTimeout(resolve, time)) +} + export default { isClient, isServer, fakeStandardDateWithOffset, matchers, MessageRecordingConnection, - spyProtocolWrite + spyProtocolWrite, + wait } diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index c8bdc659e..e4658cbea 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -58,6 +58,8 @@ export interface Config { maxTransactionRetryTime?: number maxConnectionLifetime?: number connectionAcquisitionTimeout?: number + sessionConnectionTimeout?: number + updateRoutingTableTimeout?: number connectionTimeout?: number disableLosslessIntegers?: boolean useBigInt?: boolean diff --git a/packages/core/test/driver.test.ts b/packages/core/test/driver.test.ts index ab3071dad..fb96b8426 100644 --- a/packages/core/test/driver.test.ts +++ b/packages/core/test/driver.test.ts @@ -143,7 +143,7 @@ describe('Driver', () => { connectionAcquisitionTimeout: 60000, fetchSize: 1000, maxConnectionLifetime: 3600000, - maxConnectionPoolSize: 100, + maxConnectionPoolSize: 100 }, connectionProvider, database: '', diff --git a/packages/neo4j-driver-lite/src/index.ts b/packages/neo4j-driver-lite/src/index.ts index 9a183d32a..409cb801e 100644 --- a/packages/neo4j-driver-lite/src/index.ts +++ b/packages/neo4j-driver-lite/src/index.ts @@ -148,6 +148,19 @@ const { * // connection or borrow an existing one. * connectionAcquisitionTimeout: 60000, // 1 minute * + * // The maximum amout of time for a session to wait to acquire an usable connection. This encompasses *everything* + * // that needs to happen for this, including, if necessary, updating the routing table<, acquiring a connection + * // from the pool, and, if necessary performing a BOLT and Authentication handshake with the reader/writer. + * // Since this can include updating the routing table, it is recommended to keep this bigger than `updateRoutingTableTimeout`. + * sessionConnectionTimeout: null, // Disabled for not breaking compatibility. Recommended: 120000 ms + * + * // the maximum amount of time the driver will attempt to fetch a new routing table. This encompasses *everything* + * // that needs to happen for this, including fetching connections from the pool, performing handshakes, + * // and requesting and receiving a fresh routing table. + * // Since this includes acquiring a connection from the pool plus an extra round-trip for fetching the routing table, + * // it is recommended to keep this bigger than `connectionAcquisitionTimeout`. + * updateRoutingTableTimeout: null, // Disabled for not breaking compatibility. Recommended: 90000 ms + * * // Specify the maximum time in milliseconds transactions are allowed to retry via * // `Session#readTransaction()` and `Session#writeTransaction()` functions. * // These functions will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient diff --git a/packages/neo4j-driver/src/index.js b/packages/neo4j-driver/src/index.js index 1110b7811..b3dfc9f33 100644 --- a/packages/neo4j-driver/src/index.js +++ b/packages/neo4j-driver/src/index.js @@ -134,6 +134,19 @@ const { * // connection or borrow an existing one. * connectionAcquisitionTimeout: 60000, // 1 minute * + * // The maximum amout of time for a session to wait to acquire an usable connection. This encompasses *everything* + * // that needs to happen for this, including, if necessary, updating the routing table<, acquiring a connection + * // from the pool, and, if necessary performing a BOLT and Authentication handshake with the reader/writer. + * // Since this can include updating the routing table, it is recommended to keep this bigger than `updateRoutingTableTimeout`. + * sessionConnectionTimeout: null, // Disabled for not breaking compatibility. Recommended: 120000 ms + * + * // the maximum amount of time the driver will attempt to fetch a new routing table. This encompasses *everything* + * // that needs to happen for this, including fetching connections from the pool, performing handshakes, + * // and requesting and receiving a fresh routing table. + * // Since this includes acquiring a connection from the pool plus an extra round-trip for fetching the routing table, + * // it is recommended to keep this bigger than `connectionAcquisitionTimeout`. + * updateRoutingTableTimeout: null, // Disabled for not breaking compatibility. Recommended: 90000 ms + * * // Specify the maximum time in milliseconds transactions are allowed to retry via * // `Session#readTransaction()` and `Session#writeTransaction()` functions. * // These functions will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 882664643..5c055caac 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -81,6 +81,12 @@ export function NewDriver (context, data, wire) { if ('connectionAcquisitionTimeoutMs' in data) { config.connectionAcquisitionTimeout = data.connectionAcquisitionTimeoutMs } + if ('sessionConnectionTimeoutMs' in data) { + config.sessionConnectionTimeout = data.sessionConnectionTimeoutMs + } + if ('updateRoutingTableTimeoutMs' in data) { + config.updateRoutingTableTimeout = data.updateRoutingTableTimeoutMs + } if ('fetchSize' in data) { config.fetchSize = data.fetchSize } @@ -344,6 +350,8 @@ export function GetFeatures (_context, _params, wire) { 'Feature:Bolt:4.4', 'Feature:API:Result.List', 'Detail:ResultStreamWorksAfterBrokenRecord', + 'Feature:API:SessionConnectionTimeout', + 'Feature:API:UpdateRoutingTableTimeout', ...SUPPORTED_TLS ] }) diff --git a/packages/testkit-backend/src/skipped-tests/browser.js b/packages/testkit-backend/src/skipped-tests/browser.js index 0ba30d33d..9770880e2 100644 --- a/packages/testkit-backend/src/skipped-tests/browser.js +++ b/packages/testkit-backend/src/skipped-tests/browser.js @@ -1,5 +1,11 @@ import skip, { ifEndsWith, ifEquals, ifStartsWith } from './skip' const skippedTests = [ + skip( + 'Flacky in testkit', + ifStartsWith('stub.driver_parameters.test_update_routing_table_timeout_ms'), + ifStartsWith('stub.driver_parameters.test_session_connection_timeout'), + ifStartsWith('stub.driver_parameters.test_max_connection_pool_size.TestMaxConnectionPoolSize.test_connection_pool_maxes_out_at_100_by_default') + ), skip( "Browser doesn't support socket timeouts", ifStartsWith('stub.configuration_hints.test_connection_recv_timeout_seconds')