diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-direct.js b/packages/bolt-connection/src/connection-provider/connection-provider-direct.js index f45abef1b..b4cb1ce61 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-direct.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-direct.js @@ -24,7 +24,6 @@ import { ConnectionErrorHandler } from '../connection' import { internal, error } from 'neo4j-driver-core' -import { controller } from '../lang' const { constants: { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V4_4 } @@ -50,17 +49,12 @@ export default class DirectConnectionProvider extends PooledConnectionProvider { this._handleAuthorizationExpired(error, address, database) }) - const acquireConnectionJob = { - run: () => this._connectionPool - .acquire(this._address) - .then( - connection => - new DelegateConnection(connection, databaseSpecificErrorHandler) - ), - onTimeout: connection => connection._release() - } - - return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, acquireConnectionJob) + return this._connectionPool + .acquire(this._address) + .then( + connection => + new DelegateConnection(connection, databaseSpecificErrorHandler) + ) } _handleAuthorizationExpired (error, address, database) { diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index ad05c1ab3..42cb30fbc 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -19,7 +19,7 @@ import { createChannelConnection, ConnectionErrorHandler } from '../connection' import Pool, { PoolConfig } from '../pool' -import { error, newError, ConnectionProvider, ServerInfo } from 'neo4j-driver-core' +import { error, ConnectionProvider, ServerInfo } from 'neo4j-driver-core' const { SERVICE_UNAVAILABLE } = error export default class PooledConnectionProvider extends ConnectionProvider { @@ -58,12 +58,6 @@ export default class PooledConnectionProvider extends ConnectionProvider { log: this._log }) this._openConnections = {} - this._sessionConnectionTimeoutConfig = { - timeout: this._config.sessionConnectionTimeout, - reason: () => newError( - `Session acquisition timed out in ${this._config.sessionConnectionTimeout} ms.` - ) - } } _createConnectionErrorHandler () { diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index 134be650d..c61f049b2 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -23,7 +23,6 @@ import { HostNameResolver } from '../channel' import SingleConnectionProvider from './connection-provider-single' import PooledConnectionProvider from './connection-provider-pooled' import { LeastConnectedLoadBalancingStrategy } from '../load-balancing' -import { controller } from '../lang' import { createChannelConnection, ConnectionErrorHandler, @@ -76,13 +75,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ) }) - this._updateRoutingTableTimeoutConfig = { - timeout: this._config.updateRoutingTableTimeout, - reason: () => newError( - `Routing table update timed out in ${this._config.updateRoutingTableTimeout} ms.` - ) - } - this._routingContext = { ...routingContext, address: address.toString() } this._seedRouter = address this._rediscovery = new Rediscovery(this._routingContext) @@ -151,66 +143,53 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._handleAuthorizationExpired(error, address, context.database) ) - const refreshRoutingTableJob = { - run: async (_, cancelationToken) => { - const routingTable = await this._freshRoutingTable({ - accessMode, - database: context.database, - bookmarks: bookmarks, - impersonatedUser, - onDatabaseNameResolved: (databaseName) => { - context.database = context.database || databaseName - if (onDatabaseNameResolved) { - onDatabaseNameResolved(databaseName) - } - }, - cancelationToken - }) - - // select a target server based on specified access mode - if (accessMode === READ) { - address = this._loadBalancingStrategy.selectReader(routingTable.readers) - name = 'read' - } else if (accessMode === WRITE) { - address = this._loadBalancingStrategy.selectWriter(routingTable.writers) - name = 'write' - } else { - throw newError('Illegal mode ' + accessMode) - } - - // we couldn't select a target server - if (!address) { - throw newError( - `Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`, - SESSION_EXPIRED - ) + const routingTable = await this._freshRoutingTable({ + accessMode, + database: context.database, + bookmarks: bookmarks, + impersonatedUser, + onDatabaseNameResolved: (databaseName) => { + context.database = context.database || databaseName + if (onDatabaseNameResolved) { + onDatabaseNameResolved(databaseName) } - return { routingTable, address } } - } + }) - const acquireConnectionJob = { - run: async ({ routingTable, address }) => { - try { - const connection = await this._acquireConnectionToServer( - address, - name, - routingTable - ) + // select a target server based on specified access mode + if (accessMode === READ) { + address = this._loadBalancingStrategy.selectReader(routingTable.readers) + name = 'read' + } else if (accessMode === WRITE) { + address = this._loadBalancingStrategy.selectWriter(routingTable.writers) + name = 'write' + } else { + throw newError('Illegal mode ' + accessMode) + } - return new DelegateConnection(connection, databaseSpecificErrorHandler) - } catch (error) { - const transformed = databaseSpecificErrorHandler.handleAndTransformError( - error, - address - ) - throw transformed - } - }, - onTimeout: connection => connection._release() + // we couldn't select a target server + if (!address) { + throw newError( + `Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`, + SESSION_EXPIRED + ) } - return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, refreshRoutingTableJob, acquireConnectionJob) + try { + const connection = await this._acquireConnectionToServer( + address, + name, + routingTable + ) + + return new DelegateConnection(connection, databaseSpecificErrorHandler) + } catch (error) { + const transformed = databaseSpecificErrorHandler.handleAndTransformError( + error, + address + ) + throw transformed + } } async _hasProtocolVersion (versionPredicate) { @@ -322,28 +301,22 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return this._connectionPool.acquire(address) } - _freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken = new controller.CancelationToken(() => false) } = {}) { - const refreshRoutingTableJob = { - run: (_, refreshCancelationToken) => { - const combinedCancelationToken = refreshCancelationToken.combine(cancelationToken) - const currentRoutingTable = this._routingTableRegistry.get( - database, - () => new RoutingTable({ database }) - ) + _freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) { + const currentRoutingTable = this._routingTableRegistry.get( + database, + () => new RoutingTable({ database }) + ) - if (!currentRoutingTable.isStaleFor(accessMode)) { - return currentRoutingTable - } - this._log.info( - `Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}` - ) - return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, combinedCancelationToken) - } + if (!currentRoutingTable.isStaleFor(accessMode)) { + return currentRoutingTable } - return controller.runWithTimeout(this._updateRoutingTableTimeoutConfig, refreshRoutingTableJob) + this._log.info( + `Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}` + ) + return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved) } - _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken) { + _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved) { const knownRouters = currentRoutingTable.routers if (this._useSeedRouter) { @@ -352,8 +325,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, - cancelationToken + onDatabaseNameResolved ) } return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter( @@ -361,8 +333,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, - cancelationToken + onDatabaseNameResolved ) } @@ -371,8 +342,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, - cancelationToken + onDatabaseNameResolved ) { // we start with seed router, no routers were probed before const seenRouters = [] @@ -381,8 +351,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._seedRouter, currentRoutingTable, bookmarks, - impersonatedUser, - cancelationToken + impersonatedUser ) if (newRoutingTable) { @@ -393,8 +362,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider knownRouters, currentRoutingTable, bookmarks, - impersonatedUser, - cancelationToken + impersonatedUser ) newRoutingTable = newRoutingTable2 error = error2 || error @@ -413,15 +381,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, - cancelationToken + onDatabaseNameResolved ) { let [newRoutingTable, error] = await this._fetchRoutingTableUsingKnownRouters( knownRouters, currentRoutingTable, bookmarks, - impersonatedUser, - cancelationToken + impersonatedUser ) if (!newRoutingTable) { @@ -431,8 +397,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._seedRouter, currentRoutingTable, bookmarks, - impersonatedUser, - cancelationToken + impersonatedUser ) } @@ -448,15 +413,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider knownRouters, currentRoutingTable, bookmarks, - impersonatedUser, - cancelationToken + impersonatedUser ) { const [newRoutingTable, error] = await this._fetchRoutingTable( knownRouters, currentRoutingTable, bookmarks, - impersonatedUser, - cancelationToken + impersonatedUser ) if (newRoutingTable) { @@ -481,19 +444,16 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider seedRouter, routingTable, bookmarks, - impersonatedUser, - cancelationToken + impersonatedUser ) { const resolvedAddresses = await this._resolveSeedRouter(seedRouter) - cancelationToken.throwIfCancellationRequested() - // filter out all addresses that we've already tried const newAddresses = resolvedAddresses.filter( address => seenRouters.indexOf(address) < 0 ) - return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser, cancelationToken) + return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser) } async _resolveSeedRouter (seedRouter) { @@ -505,7 +465,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return [].concat.apply([], dnsResolvedAddresses) } - async _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser, cancelationToken) { + async _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser) { return routerAddresses.reduce( async (refreshedTablePromise, currentRouter, currentIndex) => { const [newRoutingTable] = await refreshedTablePromise @@ -539,13 +499,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider impersonatedUser ), null] } catch (error) { - cancelationToken.throwIfCancellationRequested() return this._handleRediscoveryError(error, currentRouter) } finally { - await session.close() + session.close() } } else { - cancelationToken.throwIfCancellationRequested() // unable to acquire connection and create session towards the current router // return null to signal that the next router should be tried return [null, error] diff --git a/packages/bolt-connection/src/lang/controller.js b/packages/bolt-connection/src/lang/controller.js deleted file mode 100644 index 63b3fcad0..000000000 --- a/packages/bolt-connection/src/lang/controller.js +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * The cancel operation error - */ -export class OperationCanceledError extends Error { - constructor () { - super() - this.name = OperationCanceledError.name - } -} - -/** - * The CancelationToken used by `runWithTimeout` for cancel an working job. - */ -export class CancelationToken { - constructor (getCancelationRequested) { - this._getCancelationRequested = getCancelationRequested - Object.freeze(this) - } - - /** - * If it receive a cancelation request - */ - get isCancelationRequested () { - return this._getCancelationRequested() - } - - /** - * Combine two cancelations token in one. - * - * The new cancelation token will be canceled if one of the - * token get canceled. - * - * @param {CancelationToken} cancelationToken The other cancelation token - * @returns {CancelationToken} Combined cancelation toke - */ - combine (cancelationToken) { - return new CancelationToken(() => - this.isCancelationRequested === true || cancelationToken.isCancelationRequested === true) - } - - /** - * - * @param {Error} [error] the error to be thrown. Be default: OperationCanceledError will be thrown - * @throws {OperationCanceledError|Error} if a cancelation request was done - * @returns {void} - */ - throwIfCancellationRequested (error) { - if (this.isCancelationRequested) { - if (error != null) { - throw error - } - throw new OperationCanceledError() - } - } -} - -/** - * @typedef {Object} Job - * @property {function(any, CancelationToken)} run method called for run the job - * @property {function(any)} [onTimeout] method called after job finished and the controller has timeout. - * Useful for cleanups. - */ -/** - * @param {any} param0 - * @param {number} param0.timeout The timeout time - * @param {Error} param0.reason The reason for the timeout - * @param {...Job} jobs The jobs to be run in sequence - * @returns {Promise} The result of all the jobs or a timeout failure - */ -export function runWithTimeout ({ timeout, reason }, ...jobs) { - const status = { timedout: false } - const cancelationToken = new CancelationToken(() => status.timedout) - async function _run (currentValue, { resolve, reject }, myJobs) { - const [{ run, onTimeout = () => Promise.resolve() }, ...otherJobs] = myJobs - try { - const value = await run(currentValue, cancelationToken) - if (status.timedout) { - await onTimeout(value).catch(() => {}) - } else if (otherJobs.length === 0) { - resolve(value) - } else { - await _run(value, { resolve, reject }, otherJobs) - } - } catch (e) { - if (!status.timedout) { - reject(e) - } - } - } - - return new Promise((resolve, reject) => { - if (timeout != null) { - status.timeoutHandle = setTimeout(() => { - status.timedout = true - reject(reason()) - }, timeout) - } - - _run(undefined, { resolve, reject }, jobs) - .finally(() => { - if (status.timeoutHandle != null) { - clearTimeout(status.timeoutHandle) - } - }) - }) -} diff --git a/packages/bolt-connection/src/lang/index.js b/packages/bolt-connection/src/lang/index.js index a76639574..9d565fe8f 100644 --- a/packages/bolt-connection/src/lang/index.js +++ b/packages/bolt-connection/src/lang/index.js @@ -18,4 +18,3 @@ */ export * as functional from './functional' -export * as controller from './controller' diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index f0787fe0e..e1ec76394 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -21,7 +21,6 @@ import DirectConnectionProvider from '../../src/connection-provider/connection-p import { Pool } from '../../src/pool' import { Connection, DelegateConnection } from '../../src/connection' import { internal, newError, ServerInfo } from 'neo4j-driver-core' -import testUtils from '../test-utils' const { serverAddress: { ServerAddress }, @@ -140,68 +139,6 @@ it('should not change error when TokenExpired happens', async () => { expect(error).toBe(expectedError) }) -describe('config.sessionConnectionTimeout', () => { - describe('when connection is acquired in time', () => { - let connectionPromise - let address - let pool - - beforeEach(() => { - address = ServerAddress.fromUrl('localhost:123') - pool = newPool() - const connectionProvider = newDirectConnectionProvider(address, pool, { - sessionConnectionTimeout: 120000 - }) - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: '' }) - }) - - it('should resolve with the connection', async () => { - const connection = await connectionPromise - - expect(connection).toBeDefined() - expect(connection.address).toEqual(address) - expect(pool.has(address)).toBeTruthy() - }) - }) - - describe('when connection is not acquired in time', () => { - let connectionPromise - let address - let pool - let seenConnections - - beforeEach(() => { - seenConnections = [] - address = ServerAddress.fromUrl('localhost:123') - pool = newPool({ delay: 1000, seenConnections }) - const connectionProvider = newDirectConnectionProvider(address, pool, { - sessionConnectionTimeout: 500 - }) - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: '' }) - }) - - it('should reject with Session acquisition timeout error', async () => { - await expect(connectionPromise).rejects.toThrowError(newError( - 'Session acquisition timed out in 500 ms.' - )) - }) - - it('should return the connection back to the pool', async () => { - await expect(connectionPromise).rejects.toThrow() - // wait for connection be released to the pool - await testUtils.wait(600) - - expect(seenConnections.length).toBe(1) - expect(seenConnections[0]._release).toBeCalledTimes(1) - expect(pool.has(address)).toBe(true) - }) - }) -}) - describe('.verifyConnectivityAndGetServerInfo()', () => { describe('when connection is available in the pool', () => { it('should return the server info', async () => { @@ -314,7 +251,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { const create = (address, release) => { const connection = new FakeConnection(address, release, server) connection.protocol = () => { - return { version: protocolVersion, isLastMessageLogin () { return false } } + return { version: protocolVersion, isLastMessageLogin() { return false } } } connection.resetAndFlush = resetAndFlush if (releaseMock) { @@ -376,10 +313,10 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { }) }) -function newDirectConnectionProvider (address, pool, config) { +function newDirectConnectionProvider (address, pool) { const connectionProvider = new DirectConnectionProvider({ id: 0, - config: { ...config }, + config: {}, log: Logger.noOp(), address: address }) @@ -387,18 +324,17 @@ function newDirectConnectionProvider (address, pool, config) { return connectionProvider } -function newPool ({ create, config, delay, seenConnections = [] } = {}) { +function newPool ({ create, config } = {}) { const _create = (address, release) => { - const connection = create != null ? create(address, release) : new FakeConnection(address, release) - seenConnections.push(connection) - return connection + if (create) { + return create(address, release) + } + return new FakeConnection(address, release) } return new Pool({ config, - create: async (address, release) => { - await testUtils.wait(delay) - return _create(address, release) - } + create: (address, release) => + Promise.resolve(_create(address, release)) }) } diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index 07f6b352c..3055be8b1 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -31,7 +31,6 @@ import { Pool } from '../../src/pool' import SimpleHostNameResolver from '../../src/channel/browser/browser-host-name-resolver' import RoutingConnectionProvider from '../../src/connection-provider/connection-provider-routing' import { DelegateConnection, Connection } from '../../src/connection' -import testUtils from '../test-utils' const { serverAddress: { ServerAddress }, @@ -79,8 +78,6 @@ describe.each([ const serverDD = ServerAddress.fromUrl('serverDD') const serverEE = ServerAddress.fromUrl('serverEE') - const defaultSeedRouter = ServerAddress.fromUrl('server-non-existing-seed-router') - const usersDataSet = [ [null], [undefined], @@ -2564,461 +2561,6 @@ describe.each([ }) }) - describe('config.sessionConnectionTimeout', () => { - describe('when connection is acquired in time', () => { - let connectionPromise - let address - let pool - - beforeEach(() => { - address = server3 - const routingTable = newRoutingTable( - null, - [server1], - [server3], - [server5] - ) - pool = newPool() - const routerToRoutingTable = { - null: routingTable - } - const rediscovery = new FakeRediscovery({ - null: { - // Seed Router - [defaultSeedRouter.asKey()]: routingTable - } - }) - - const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( - rediscovery, - pool, - routerToRoutingTable, - { - sessionConnectionTimeout: 500 - } - ) - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: '' }) - }) - - it('should resolve with the connection', async () => { - const connection = await connectionPromise - - expect(connection).toBeDefined() - expect(connection.address).toEqual(address) - expect(pool.has(address)).toBeTruthy() - }) - }) - - describe('when connection is not acquired in time', () => { - let connectionPromise - let address - let pool - let seenConnections - - beforeEach(() => { - seenConnections = [] - address = server3 - const routingTable = newRoutingTable( - null, - [server1], - [server3], - [server5] - ) - pool = newPool({ delay: 350, seenConnections }) - const routerToRoutingTable = { - null: routingTable - } - const rediscovery = new FakeRediscovery({ - null: { - // Seed Router - [defaultSeedRouter.asKey()]: routingTable - } - }) - - const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( - rediscovery, - pool, - routerToRoutingTable, - { - sessionConnectionTimeout: 600 - } - ) - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: '' }) - }) - - it('should reject with Session acquisition timeout error', async () => { - await expect(connectionPromise).rejects.toThrowError(newError( - 'Session acquisition timed out in 600 ms.' - )) - }) - - it('should return the connection back to the pool', async () => { - await expect(connectionPromise).rejects.toThrow() - // wait for connection be released to the pool - await testUtils.wait(400) - - expect(pool.has(address)).toBe(true) - const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === address) - expect(seenConnectionsToAddress.length).toBe(1) - expect(seenConnectionsToAddress[0]._release).toHaveBeenCalledTimes(1) - }) - }) - - describe('when rediscovery is not done in time', () => { - let connectionPromise - let address - let pool - let seenConnections - - beforeEach(() => { - seenConnections = [] - address = server3 - const routingTable = newRoutingTable( - null, - [server1], - [server3], - [server5] - ) - pool = newPool({ seenConnections }) - const routerToRoutingTable = { - null: routingTable - } - const rediscovery = new FakeRediscovery({ - null: { - // Seed Router - [defaultSeedRouter.asKey()]: routingTable - } - }, null, 500) - - const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( - rediscovery, - pool, - routerToRoutingTable, - { - sessionConnectionTimeout: 300 - } - ) - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: '' }) - }) - - it('should reject with Session acquisition timeout error', async () => { - await expect(connectionPromise).rejects.toThrowError(newError( - 'Session acquisition timed out in 300 ms.' - )) - }) - - it('should acquire connection the default router', async () => { - await expect(connectionPromise).rejects.toThrow() - // wait for connection be released to the pool - await testUtils.wait(400) - - const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === defaultSeedRouter) - expect(seenConnectionsToAddress.length).toBe(1) - }) - - it('should not acquire connection to reader', async () => { - await expect(connectionPromise).rejects.toThrow() - // wait for connection be released to the pool - await testUtils.wait(400) - - expect(pool.has(address)).toBe(false) - const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === address) - expect(seenConnectionsToAddress.length).toBe(0) - }) - }) - - describe('when rediscovery is not done in time and router fails', () => { - let connectionPromise - let address - let pool - let seenConnections - - beforeEach(() => { - seenConnections = [] - address = server3 - const routingTable = newRoutingTable( - 'aDatabase', - [server1, server2], // routers - [server3], - [server5], - int(0) // expired - ) - pool = newPool({ seenConnections }) - const routerToRoutingTable = { - aDatabase: routingTable - } - const rediscovery = new FakeRediscovery({ - aDatabase: { - // Seed Router - [defaultSeedRouter.asKey()]: routingTable - } - }, newError('It is a non fast-failing error'), 500) - - const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( - rediscovery, - pool, - routerToRoutingTable, - { - sessionConnectionTimeout: 300 - }, - [routingTable] - ) - - // Force not use seed router - connectionProvider._useSeedRouter = false - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: 'aDatabase' }) - }) - - it('should reject with Session acquisition timeout error', async () => { - await expect(connectionPromise).rejects.toThrowError(newError( - 'Session acquisition timed out in 300 ms.' - )) - }) - - it('should acquire connection the router1', async () => { - await expect(connectionPromise).rejects.toThrow() - // Wait for connection to server 1 be created - await testUtils.wait(300) - - const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === server1) - expect(seenConnectionsToAddress.length).toBe(1) - }) - - it('should not acquire connection the router2', async () => { - await expect(connectionPromise).rejects.toThrow() - // wait for connection server 2 be created - await testUtils.wait(800) - - expect(pool.has(address)).toBe(false) - const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === server2) - expect(seenConnectionsToAddress.length).toBe(0) - }) - }) - }) - - describe('config.updateRoutingTableTimeout', () => { - describe('when routing table is refreshed in time', () => { - let connectionPromise - let address - let pool - - beforeEach(() => { - address = server3 - const routingTable = newRoutingTable( - null, - [server1], - [server3], - [server5] - ) - pool = newPool() - const routerToRoutingTable = { - null: routingTable - } - const rediscovery = new FakeRediscovery({ - null: { - // Seed Router - [defaultSeedRouter.asKey()]: routingTable - } - }) - - const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( - rediscovery, - pool, - routerToRoutingTable, - { - updateRoutingTableTimeout: 500 - } - ) - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: '' }) - }) - - it('should resolve with the connection', async () => { - const connection = await connectionPromise - - expect(connection).toBeDefined() - expect(connection.address).toEqual(address) - expect(pool.has(address)).toBeTruthy() - }) - }) - - describe('when routing connection is not acquired in time', () => { - let connectionPromise - let pool - let seenConnections - - beforeEach(() => { - seenConnections = [] - const routingTable = newRoutingTable( - null, - [server1], - [server3], - [server5] - ) - pool = newPool({ delay: 350, seenConnections }) - const routerToRoutingTable = { - null: routingTable - } - const rediscovery = new FakeRediscovery({ - null: { - // Seed Router - [defaultSeedRouter.asKey()]: routingTable - } - }) - - const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( - rediscovery, - pool, - routerToRoutingTable, - { - updateRoutingTableTimeout: 300 - } - ) - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: '' }) - }) - - it('should reject with routing table update timeout error', async () => { - await expect(connectionPromise).rejects.toThrowError(newError( - 'Routing table update timed out in 300 ms.' - )) - }) - }) - - describe('when rediscovery is not done in time', () => { - let connectionPromise - let pool - let seenConnections - - beforeEach(() => { - seenConnections = [] - const routingTable = newRoutingTable( - null, - [server1], - [server3], - [server5] - ) - pool = newPool({ seenConnections }) - const routerToRoutingTable = { - null: routingTable - } - const rediscovery = new FakeRediscovery({ - null: { - // Seed Router - [defaultSeedRouter.asKey()]: routingTable - } - }, null, 500) - - const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( - rediscovery, - pool, - routerToRoutingTable, - { - updateRoutingTableTimeout: 300 - } - ) - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: '' }) - }) - - it('should reject with routing table update timeout error', async () => { - await expect(connectionPromise).rejects.toThrowError(newError( - 'Routing table update timed out in 300 ms.' - )) - }) - - it('should release the connection', async () => { - await expect(connectionPromise).rejects.toThrow() - // wait for connection be released - await testUtils.wait(400) - - const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === defaultSeedRouter) - expect(seenConnectionsToAddress.length).toBe(1) - }) - }) - - describe('when rediscovery is not done in time and router fails', () => { - let connectionPromise - let address - let pool - let seenConnections - - beforeEach(() => { - seenConnections = [] - address = server3 - const routingTable = newRoutingTable( - 'aDatabase', - [server1, server2], // routers - [server3], - [server5], - int(0) // expired - ) - pool = newPool({ seenConnections }) - const routerToRoutingTable = { - aDatabase: routingTable - } - const rediscovery = new FakeRediscovery({ - aDatabase: { - // Seed Router - [defaultSeedRouter.asKey()]: routingTable - } - }, newError('It is a non fast-failing error'), 500) - - const connectionProvider = newRoutingConnectionProviderWithFakeRediscovery( - rediscovery, - pool, - routerToRoutingTable, - { - updateRoutingTableTimeout: 300 - }, - [routingTable] - ) - - // Force not use seed router - connectionProvider._useSeedRouter = false - - connectionPromise = connectionProvider - .acquireConnection({ accessMode: 'READ', database: 'aDatabase' }) - }) - - it('should reject with routing table update timeout error', async () => { - await expect(connectionPromise).rejects.toThrowError(newError( - 'Routing table update timed out in 300 ms.' - )) - }) - - it('should acquire connection the router1', async () => { - await expect(connectionPromise).rejects.toThrow() - // Wait for connection to server 1 be created - await testUtils.wait(300) - - const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === server1) - expect(seenConnectionsToAddress.length).toBe(1) - }) - - it('should not acquire connection the router2', async () => { - await expect(connectionPromise).rejects.toThrow() - // wait for connection server 2 be created - await testUtils.wait(800) - - expect(pool.has(address)).toBe(false) - const seenConnectionsToAddress = seenConnections.filter(conn => conn.address === server2) - expect(seenConnectionsToAddress.length).toBe(0) - }) - }) - }) - describe.each([ [undefined, READ], [undefined, WRITE], @@ -3389,14 +2931,16 @@ describe.each([ }) }) - function newPool ({ create, config, delay, seenConnections = [] } = {}) { - const _create = async (address, release) => { - await testUtils.wait(delay) - const connection = create != null - ? create(address, release) - : new FakeConnection(address, release, 'version', PROTOCOL_VERSION) - seenConnections.push(connection) - return connection + function newPool ({ create, config } = {}) { + const _create = (address, release) => { + if (create) { + try { + return Promise.resolve(create(address, release)) + } catch (e) { + return Promise.reject(e) + } + } + return Promise.resolve(new FakeConnection(address, release, 'version', PROTOCOL_VERSION)) } return new Pool({ config, @@ -3411,8 +2955,7 @@ describe.each([ routerToRoutingTable = { null: {} }, connectionPool = null, routingTablePurgeDelay = null, - fakeRediscovery = null, - config = {} + fakeRediscovery = null ) { const pool = connectionPool || newPool() const connectionProvider = new RoutingConnectionProvider({ @@ -3420,7 +2963,7 @@ describe.each([ address: seedRouter, routingContext: {}, hostNameResolver: new SimpleHostNameResolver(), - config: config, + config: {}, log: Logger.noOp(), routingTablePurgeDelay: routingTablePurgeDelay }) @@ -3442,7 +2985,7 @@ describe.each([ pool = null, routerToRoutingTable = { null: {} } ) { - const seedRouter = defaultSeedRouter + const seedRouter = ServerAddress.fromUrl('server-non-existing-seed-router') return newRoutingConnectionProviderWithSeedRouter( seedRouter, [seedRouter], @@ -3455,20 +2998,17 @@ describe.each([ function newRoutingConnectionProviderWithFakeRediscovery ( fakeRediscovery, pool = null, - routerToRoutingTable = { null: {} }, - config = {}, - routingTables = [] + routerToRoutingTable = { null: {} } ) { - const seedRouter = defaultSeedRouter + const seedRouter = ServerAddress.fromUrl('server-non-existing-seed-router') return newRoutingConnectionProviderWithSeedRouter( seedRouter, [seedRouter], - routingTables, + [], routerToRoutingTable, pool, null, - fakeRediscovery, - config + fakeRediscovery ) } }) @@ -3512,9 +3052,9 @@ function setupRoutingConnectionProviderToRememberRouters ( const originalFetch = connectionProvider._fetchRoutingTable.bind( connectionProvider ) - const rememberingFetch = (routerAddresses, ...rest) => { + const rememberingFetch = (routerAddresses, routingTable, bookmarks) => { routersArray.push(routerAddresses) - return originalFetch(...[routerAddresses, ...rest]) + return originalFetch(routerAddresses, routingTable, bookmarks) } connectionProvider._fetchRoutingTable = rememberingFetch } @@ -3583,16 +3123,14 @@ class FakeConnection extends Connection { } class FakeRediscovery { - constructor (routerToRoutingTable, error, delay) { + constructor (routerToRoutingTable, error) { this._routerToRoutingTable = routerToRoutingTable this._error = error - this._delay = delay } - async lookupRoutingTableOnRouter (ignored, database, router, user) { - await testUtils.wait(this._delay) + lookupRoutingTableOnRouter (ignored, database, router, user) { if (this._error) { - throw this._error + return Promise.reject(this._error) } const table = this._routerToRoutingTable[database || null] if (table) { @@ -3601,9 +3139,9 @@ class FakeRediscovery { if (routingTables instanceof Array) { routingTable = routingTables.find(rt => rt.user === user) } - return routingTable + return Promise.resolve(routingTable) } - return null + return Promise.resolve(null) } } diff --git a/packages/bolt-connection/test/test-utils.js b/packages/bolt-connection/test/test-utils.js index 40941fbbc..b9e112e03 100644 --- a/packages/bolt-connection/test/test-utils.js +++ b/packages/bolt-connection/test/test-utils.js @@ -134,16 +134,11 @@ function spyProtocolWrite (protocol, callRealMethod = false) { return protocol } -function wait (time) { - return new Promise((resolve) => setTimeout(resolve, time)) -} - export default { isClient, isServer, fakeStandardDateWithOffset, matchers, MessageRecordingConnection, - spyProtocolWrite, - wait + spyProtocolWrite } diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index 7a7d33927..241115e8d 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -27,9 +27,7 @@ import { FETCH_ALL, DEFAULT_CONNECTION_TIMEOUT_MILLIS, DEFAULT_POOL_ACQUISITION_TIMEOUT, - DEFAULT_POOL_MAX_SIZE, - DEFAULT_SESSION_CONNECTION_TIMEOUT, - DEFAULT_UPDATE_ROUTING_TABLE_TIMEOUT + DEFAULT_POOL_MAX_SIZE } from './internal/constants' import { Logger } from './internal/logger' import Session from './session' @@ -436,14 +434,6 @@ function sanitizeConfig (config: any): void { config.fetchSize, DEFAULT_FETCH_SIZE ) - config.sessionConnectionTimeout = sanitizeIntValue( - config.sessionConnectionTimeout, - DEFAULT_SESSION_CONNECTION_TIMEOUT - ) - config.updateRoutingTableTimeout = sanitizeIntValue( - config.updateRoutingTableTimeout, - DEFAULT_UPDATE_ROUTING_TABLE_TIMEOUT - ) config.connectionTimeout = extractConnectionTimeout(config) } diff --git a/packages/core/src/internal/constants.ts b/packages/core/src/internal/constants.ts index 11d675273..39e790f2f 100644 --- a/packages/core/src/internal/constants.ts +++ b/packages/core/src/internal/constants.ts @@ -19,8 +19,6 @@ const FETCH_ALL = -1 const DEFAULT_POOL_ACQUISITION_TIMEOUT = 60 * 1000 // 60 seconds -const DEFAULT_SESSION_CONNECTION_TIMEOUT = 120 * 1000 // 120 seconds -const DEFAULT_UPDATE_ROUTING_TABLE_TIMEOUT = 90 * 1000 // 90 seconds const DEFAULT_POOL_MAX_SIZE = 100 const DEFAULT_CONNECTION_TIMEOUT_MILLIS = 30000 // 30 seconds by default @@ -43,8 +41,6 @@ export { ACCESS_MODE_WRITE, DEFAULT_CONNECTION_TIMEOUT_MILLIS, DEFAULT_POOL_ACQUISITION_TIMEOUT, - DEFAULT_SESSION_CONNECTION_TIMEOUT, - DEFAULT_UPDATE_ROUTING_TABLE_TIMEOUT, DEFAULT_POOL_MAX_SIZE, BOLT_PROTOCOL_V1, BOLT_PROTOCOL_V2, diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index c904bd16f..63140cc16 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -58,8 +58,6 @@ export interface Config { maxTransactionRetryTime?: number maxConnectionLifetime?: number connectionAcquisitionTimeout?: number - sessionConnectionTimeout?: number - updateRoutingTableTimeout?: number connectionTimeout?: number disableLosslessIntegers?: boolean useBigInt?: boolean diff --git a/packages/core/test/driver.test.ts b/packages/core/test/driver.test.ts index dd41b89d2..c465167c8 100644 --- a/packages/core/test/driver.test.ts +++ b/packages/core/test/driver.test.ts @@ -252,9 +252,7 @@ describe('Driver', () => { fetchSize: 1000, maxConnectionLifetime: 3600000, maxConnectionPoolSize: 100, - connectionTimeout: 30000, - sessionConnectionTimeout: 120000, - updateRoutingTableTimeout: 90000 + connectionTimeout: 30000 }, connectionProvider, database: '', diff --git a/packages/neo4j-driver-lite/src/index.ts b/packages/neo4j-driver-lite/src/index.ts index 93f0d8234..3456b190c 100644 --- a/packages/neo4j-driver-lite/src/index.ts +++ b/packages/neo4j-driver-lite/src/index.ts @@ -151,19 +151,6 @@ const { * // connection or borrow an existing one. * connectionAcquisitionTimeout: 60000, // 1 minute * - * // The maximum amout of time for a session to wait to acquire an usable connection. This encompasses *everything* - * // that needs to happen for this, including, if necessary, updating the routing table<, acquiring a connection - * // from the pool, and, if necessary performing a BOLT and Authentication handshake with the reader/writer. - * // Since this can include updating the routing table, it is recommended to keep this bigger than `updateRoutingTableTimeout`. - * sessionConnectionTimeout: 120000, // 2 minutes - * - * // the maximum amount of time the driver will attempt to fetch a new routing table. This encompasses *everything* - * // that needs to happen for this, including fetching connections from the pool, performing handshakes, - * // and requesting and receiving a fresh routing table. - * // Since this includes acquiring a connection from the pool plus an extra round-trip for fetching the routing table, - * // it is recommended to keep this bigger than `connectionAcquisitionTimeout`. - * updateRoutingTableTimeout: 90000, // 90 seconds - * * // Specify the maximum time in milliseconds transactions are allowed to retry via * // `Session#executeRead()` and `Session#executeWrite()` functions. * // These functions will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient diff --git a/packages/neo4j-driver/src/index.js b/packages/neo4j-driver/src/index.js index 2f7d506c6..d706e5add 100644 --- a/packages/neo4j-driver/src/index.js +++ b/packages/neo4j-driver/src/index.js @@ -137,19 +137,6 @@ const { * // connection or borrow an existing one. * connectionAcquisitionTimeout: 60000, // 1 minute * - * // The maximum amout of time for a session to wait to acquire an usable connection. This encompasses *everything* - * // that needs to happen for this, including, if necessary, updating the routing table<, acquiring a connection - * // from the pool, and, if necessary performing a BOLT and Authentication handshake with the reader/writer. - * // Since this can include updating the routing table, it is recommended to keep this bigger than `updateRoutingTableTimeout`. - * sessionConnectionTimeout: 120000, // 2 minutes - * - * // the maximum amount of time the driver will attempt to fetch a new routing table. This encompasses *everything* - * // that needs to happen for this, including fetching connections from the pool, performing handshakes, - * // and requesting and receiving a fresh routing table. - * // Since this includes acquiring a connection from the pool plus an extra round-trip for fetching the routing table, - * // it is recommended to keep this bigger than `connectionAcquisitionTimeout`. - * updateRoutingTableTimeout: 90000, // 90 seconds - * * // Specify the maximum time in milliseconds transactions are allowed to retry via * // `Session#executeRead()` and `Session#executeWrite()` functions. * // These functions will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index c66c012da..26d7e4e12 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -31,8 +31,6 @@ const features = [ 'Feature:Bolt:5.0', 'Feature:Bolt:Patch:UTC', 'Feature:API:ConnectionAcquisitionTimeout', - 'Feature:API:SessionConnectionTimeout', - 'Feature:API:UpdateRoutingTableTimeout', 'Feature:API:Driver:GetServerInfo', 'Feature:API:Driver.VerifyConnectivity', 'Feature:API:Result.Peek', diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index e9971f94e..9abc93dd8 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -75,12 +75,6 @@ export function NewDriver (context, data, wire) { if ('connectionAcquisitionTimeoutMs' in data) { config.connectionAcquisitionTimeout = data.connectionAcquisitionTimeoutMs } - if ('sessionConnectionTimeoutMs' in data) { - config.sessionConnectionTimeout = data.sessionConnectionTimeoutMs - } - if ('updateRoutingTableTimeoutMs' in data) { - config.updateRoutingTableTimeout = data.updateRoutingTableTimeoutMs - } if ('connectionTimeoutMs' in data) { config.connectionTimeout = data.connectionTimeoutMs }