From 41fc61f5e5431f6568c70f8776edec72f28aaa5d Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 15 Feb 2022 13:53:05 +0100 Subject: [PATCH 1/3] Make connection aquisition timeout also consider the connection creation time The connection acquisition timeout must account for the whole acquisition execution time, whether a new connection is created, an idle connection is picked up instead or we need to wait until the full pool depletes. In particular, the connection acquisition timeout (CAT) has precedence over the socket connection timeout (SCT). If the SCT is set to 2 hours and CAT to 50ms, the connection acquisition should time out after 50ms (as usual, these evil cats win), even if the connection is successfully created within the SCT period. Note: if SCT is larger than or equal to CAT, a warning should be issued, as this is likely a misconfiguration (and big SCT values won't have any effect on connection acquisitions anyway). --- packages/bolt-connection/src/pool/pool.js | 78 +++++++++---------- .../testkit-backend/src/request-handlers.js | 4 + 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index 3d4ce8a02..08a23b1d9 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -74,51 +74,47 @@ class Pool { * @return {Object} resource that is ready to use. */ acquire (address) { - return this._acquire(address).then(resource => { - const key = address.asKey() - if (resource) { - // New or existing resource acquired - return resource - } + const key = address.asKey() - // We're out of resources and will try to acquire later on when an existing resource is released. - const allRequests = this._acquireRequests - const requests = allRequests[key] - if (!requests) { - allRequests[key] = [] - } + // We're out of resources and will try to acquire later on when an existing resource is released. + const allRequests = this._acquireRequests + const requests = allRequests[key] + if (!requests) { + allRequests[key] = [] + } - return new Promise((resolve, reject) => { - let request - - const timeoutId = setTimeout(() => { - // acquisition timeout fired - - // remove request from the queue of pending requests, if it's still there - // request might've been taken out by the release operation - const pendingRequests = allRequests[key] - if (pendingRequests) { - allRequests[key] = pendingRequests.filter(item => item !== request) - } - - if (request.isCompleted()) { - // request already resolved/rejected by the release operation; nothing to do - } else { - // request is still pending and needs to be failed - const activeCount = this.activeResourceCount(address) - const idleCount = this.has(address) ? this._pools[key].length : 0 - request.reject( - newError( - `Connection acquisition timed out in ${this._acquisitionTimeout} ms. Pool status: Active conn count = ${activeCount}, Idle conn count = ${idleCount}.` - ) + return new Promise((resolve, reject) => { + let request + + const timeoutId = setTimeout(() => { + // acquisition timeout fired + + // remove request from the queue of pending requests, if it's still there + // request might've been taken out by the release operation + const pendingRequests = allRequests[key] + if (pendingRequests) { + allRequests[key] = pendingRequests.filter(item => item !== request) + } + + if (request.isCompleted()) { + // request already resolved/rejected by the release operation; nothing to do + } else { + // request is still pending and needs to be failed + const activeCount = this.activeResourceCount(address) + const idleCount = this.has(address) ? this._pools[key].length : 0 + request.reject( + newError( + `Connection acquisition timed out in ${this._acquisitionTimeout} ms. Pool status: Active conn count = ${activeCount}, Idle conn count = ${idleCount}.` ) - } - }, this._acquisitionTimeout) + ) + } + }, this._acquisitionTimeout) + + request = new PendingRequest(key, resolve, reject, timeoutId, this._log) + allRequests[key].push(request) - request = new PendingRequest(key, resolve, reject, timeoutId, this._log) - allRequests[key].push(request) - }) + this._processPendingAcquireRequests(address) }) } @@ -315,7 +311,7 @@ class Pool { _processPendingAcquireRequests (address) { const key = address.asKey() const requests = this._acquireRequests[key] - const poolState = this._poolState[key] + const poolState = this._poolState[key] || new PoolState() if (requests) { const pendingRequest = requests.shift() // pop a pending acquire request diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 33d0de405..53e9bcd82 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -85,6 +85,9 @@ export function NewDriver (context, data, wire) { if ('connectionAcquisitionTimeoutMs' in data) { config.connectionAcquisitionTimeout = data.connectionAcquisitionTimeoutMs } + if ('connectionTimeoutMs' in data) { + config.connectionTimeout = data.connectionTimeoutMs + } if ('fetchSize' in data) { config.fetchSize = data.fetchSize } @@ -380,6 +383,7 @@ export function GetFeatures (_context, _params, wire) { 'Feature:Bolt:4.4', 'Feature:API:Result.List', 'Feature:API:Result.Peek', + 'Feature:Configuruation:ConnectionAcquisitionTimeout', 'Optimization:EagerTransactionBegin', 'Optimization:ImplicitDefaultArguments', 'Optimization:PullPipelining', From 45137ff72e1b2479b3d66b1685d12c0fd0e688a6 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 15 Feb 2022 19:08:28 +0100 Subject: [PATCH 2/3] Tests and set warning for miss-configuration --- .../src/channel/channel-config.js | 20 +-------- packages/bolt-connection/src/pool/pool.js | 9 +++- .../test/channel/node/node-channel.test.js | 2 +- .../bolt-connection/test/pool/pool.test.js | 41 +++++++++++++++++++ packages/core/src/driver.ts | 40 ++++++++++++++++-- packages/core/src/internal/constants.ts | 2 + packages/core/test/driver.test.ts | 40 ++++++++++++++++++ .../test/internal/channel-config.test.js | 22 +--------- .../testkit-backend/src/request-handlers.js | 2 +- 9 files changed, 132 insertions(+), 46 deletions(-) diff --git a/packages/bolt-connection/src/channel/channel-config.js b/packages/bolt-connection/src/channel/channel-config.js index 2c6417fc0..85df9b22c 100644 --- a/packages/bolt-connection/src/channel/channel-config.js +++ b/packages/bolt-connection/src/channel/channel-config.js @@ -25,8 +25,6 @@ const { const { SERVICE_UNAVAILABLE } = error -const DEFAULT_CONNECTION_TIMEOUT_MILLIS = 30000 // 30 seconds by default - const ALLOWED_VALUES_ENCRYPTED = [ null, undefined, @@ -58,7 +56,7 @@ export default class ChannelConfig { this.trustedCertificates = extractTrustedCertificates(driverConfig) this.knownHostsPath = extractKnownHostsPath(driverConfig) this.connectionErrorCode = connectionErrorCode || SERVICE_UNAVAILABLE - this.connectionTimeout = extractConnectionTimeout(driverConfig) + this.connectionTimeout = driverConfig.connectionTimeout } } @@ -90,19 +88,3 @@ function extractKnownHostsPath (driverConfig) { return driverConfig.knownHosts || null } -function extractConnectionTimeout (driverConfig) { - const configuredTimeout = parseInt(driverConfig.connectionTimeout, 10) - if (configuredTimeout === 0) { - // timeout explicitly configured to 0 - return null - } else if (configuredTimeout && configuredTimeout < 0) { - // timeout explicitly configured to a negative value - return null - } else if (!configuredTimeout) { - // timeout not configured, use default value - return DEFAULT_CONNECTION_TIMEOUT_MILLIS - } else { - // timeout configured, use the provided value - return configuredTimeout - } -} diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index 08a23b1d9..c63dcdca5 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -74,7 +74,6 @@ class Pool { * @return {Object} resource that is ready to use. */ acquire (address) { - const key = address.asKey() // We're out of resources and will try to acquire later on when an existing resource is released. @@ -83,7 +82,6 @@ class Pool { if (!requests) { allRequests[key] = [] } - return new Promise((resolve, reject) => { let request @@ -335,6 +333,13 @@ class Pool { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource } + } else { + // failed to acquire a valid resource from the pool + // return the pending request back to the pool + if (!this._acquireRequests[key]) { + this._acquireRequests[key] = [] + } + this._acquireRequests[key].unshift(pendingRequest) } }) } else { diff --git a/packages/bolt-connection/test/channel/node/node-channel.test.js b/packages/bolt-connection/test/channel/node/node-channel.test.js index 5d6b175b1..973965af9 100644 --- a/packages/bolt-connection/test/channel/node/node-channel.test.js +++ b/packages/bolt-connection/test/channel/node/node-channel.test.js @@ -110,7 +110,7 @@ describe('NodeChannel', () => { }) }) -function createMockedChannel (connected, config = {}) { +function createMockedChannel (connected, config = { connectionTimeout: 30000 }) { let endCallback = null const address = ServerAddress.fromUrl('bolt://localhost:9999') const channelConfig = new ChannelConfig(address, config, SERVICE_UNAVAILABLE) diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js index d717ccbc4..90b627ee7 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -877,6 +877,40 @@ describe('#unit Pool', () => { expect(resource1.observer).toBeFalsy() expect(resource2.observer).toBeFalsy() }) + + it('should thrown aquisition timeout exception if resource takes longer to be created', async () => { + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const acquisitionTimeout = 1000 + let counter = 0 + + const pool = new Pool({ + create: (server, release) => + new Promise(resolve => setTimeout( + () => resolve(new Resource(server, counter++, release)) + , acquisitionTimeout + 10)), + destroy: res => Promise.resolve(), + validate: resourceValidOnlyOnceValidationFunction, + config: new PoolConfig(1, acquisitionTimeout) + }) + + try { + await pool.acquire(address) + fail('should have thrown') + } catch (e) { + expect(e).toEqual( + newError( + `Connection acquisition timed out in ${acquisitionTimeout} ms. ` + + 'Pool status: Active conn count = 0, Idle conn count = 0.' + ) + ) + + const numberOfIdleResourceAfterResourceGetCreated = await new Promise(resolve => + setTimeout(() => resolve(idleResources(pool, address)), 11)) + + expect(numberOfIdleResourceAfterResourceGetCreated).toEqual(1) + expect(counter).toEqual(1) + } + }) }) function expectNoPendingAcquisitionRequests (pool) { @@ -895,6 +929,13 @@ function expectNoIdleResources (pool, address) { } } +function idleResources (pool, address) { + if (pool.has(address)) { + return pool._pools[address.asKey()].length + } + return undefined +} + function expectNumberOfAcquisitionRequests (pool, address, expectedNumber) { expect(pool._acquireRequests[address.asKey()].length).toEqual(expectedNumber) } diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index 6d37f6941..fc275630d 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -26,6 +26,7 @@ import { ACCESS_MODE_READ, ACCESS_MODE_WRITE, FETCH_ALL, + DEFAULT_CONNECTION_TIMEOUT_MILLIS, DEFAULT_POOL_ACQUISITION_TIMEOUT, DEFAULT_POOL_MAX_SIZE } from './internal/constants' @@ -131,12 +132,15 @@ class Driver { createSession: CreateSession = args => new Session(args) ) { sanitizeConfig(config) - validateConfig(config) + + const log = Logger.create(config) + + validateConfig(config, log) this._id = idGenerator++ this._meta = meta this._config = config - this._log = Logger.create(config) + this._log = log; this._createConnectionProvider = createConnectonProvider this._createSession = createSession @@ -366,13 +370,22 @@ class Driver { * @private * @returns {Object} the given config. */ -function validateConfig(config: any): any { +function validateConfig(config: any, log: Logger): any { const resolver = config.resolver if (resolver && typeof resolver !== 'function') { throw new TypeError( `Configured resolver should be a function. Got: ${resolver}` ) } + + if (config.connectionAcquisitionTimeout < config.connectionTimeout) { + log.warn( + 'Configuration for "connectionAcquisitionTimeout" should be greater than ' + + 'or equal to "connectionTimeout". Otherwise, the connection acquisition ' + + 'timeout will take precedence for over the connection timeout in scenarios ' + + 'where a new connection is created while it is acquired' + ) + } return config } @@ -396,6 +409,7 @@ function sanitizeConfig(config: any) { config.fetchSize, DEFAULT_FETCH_SIZE ) + config.connectionTimeout = extractConnectionTimeout(config) } /** @@ -431,6 +445,26 @@ function validateFetchSizeValue( } } +/** + * @private + */ +function extractConnectionTimeout (config: any): number|null { + const configuredTimeout = parseInt(config.connectionTimeout, 10) + if (configuredTimeout === 0) { + // timeout explicitly configured to 0 + return null + } else if (configuredTimeout && configuredTimeout < 0) { + // timeout explicitly configured to a negative value + return null + } else if (!configuredTimeout) { + // timeout not configured, use default value + return DEFAULT_CONNECTION_TIMEOUT_MILLIS + } else { + // timeout configured, use the provided value + return configuredTimeout + } +} + /** * @private * @returns {ConfiguredCustomResolver} new custom resolver that wraps the passed-in resolver function. diff --git a/packages/core/src/internal/constants.ts b/packages/core/src/internal/constants.ts index 3c6c1a589..922ab884f 100644 --- a/packages/core/src/internal/constants.ts +++ b/packages/core/src/internal/constants.ts @@ -20,6 +20,7 @@ const FETCH_ALL = -1 const DEFAULT_POOL_ACQUISITION_TIMEOUT = 60 * 1000 // 60 seconds const DEFAULT_POOL_MAX_SIZE = 100 +const DEFAULT_CONNECTION_TIMEOUT_MILLIS = 30000 // 30 seconds by default const ACCESS_MODE_READ: 'READ' = 'READ' const ACCESS_MODE_WRITE: 'WRITE' = 'WRITE' @@ -37,6 +38,7 @@ export { FETCH_ALL, ACCESS_MODE_READ, ACCESS_MODE_WRITE, + DEFAULT_CONNECTION_TIMEOUT_MILLIS, DEFAULT_POOL_ACQUISITION_TIMEOUT, DEFAULT_POOL_MAX_SIZE, BOLT_PROTOCOL_V1, diff --git a/packages/core/test/driver.test.ts b/packages/core/test/driver.test.ts index f34118af6..9b379f568 100644 --- a/packages/core/test/driver.test.ts +++ b/packages/core/test/driver.test.ts @@ -21,6 +21,7 @@ import Driver from '../src/driver' import { Bookmarks } from '../src/internal/bookmarks' import { Logger } from '../src/internal/logger' import { ConfiguredCustomResolver } from '../src/internal/resolver' +import { LogLevel } from '../src/types' describe('Driver', () => { let driver: Driver | null @@ -155,6 +156,44 @@ describe('Driver', () => { expect(driver.isEncrypted()).toEqual(expectedValue) }) + it.each([ + [{ connectionTimeout: 30000, connectionAcquisitionTimeout: 60000 }, true], + [{ connectionTimeout: null, connectionAcquisitionTimeout: 60000 }, true], + [{ connectionTimeout: 30000, connectionAcquisitionTimeout: null }, true], + [{ connectionTimeout: null, connectionAcquisitionTimeout: null }, true], + [{ connectionAcquisitionTimeout: 60000 }, true], + [{ connectionTimeout: 30000 }, true], + [{}, true], + [{ connectionTimeout: 30000, connectionAcquisitionTimeout: 20000 }, false], + [{ connectionAcquisitionTimeout: 20000 }, false], + [{ connectionTimeout: 70000 }, false], + // No connection timeouts should be considered valid, since it means + // the user doesn't case about the connection timeout at all. + [{ connectionTimeout: 0, connectionAcquisitionTimeout: 2000 }, true], + [{ connectionTimeout: -1, connectionAcquisitionTimeout: 2000 }, true], + ])('should emit warning if `connectionAcquisitionTimeout` and `connectionTimeout` are conflicting. [%o} ', async (config, valid) => { + const logging = { + level: 'warn' as LogLevel, + logger: jest.fn() + } + + const driver = new Driver(META_INFO, { ...config, logging }, mockCreateConnectonProvider(new ConnectionProvider()), createSession) + + if (valid) { + expect(logging.logger).not.toHaveBeenCalled() + } else { + expect(logging.logger).toHaveBeenCalledWith( + 'warn', + 'Configuration for "connectionAcquisitionTimeout" should be greater than ' + + 'or equal to "connectionTimeout". Otherwise, the connection acquisition ' + + 'timeout will take precedence for over the connection timeout in scenarios ' + + 'where a new connection is created while it is acquired' + ) + } + + await driver.close() + }) + function mockCreateConnectonProvider(connectionProvider: ConnectionProvider) { return ( id: number, @@ -172,6 +211,7 @@ describe('Driver', () => { fetchSize: 1000, maxConnectionLifetime: 3600000, maxConnectionPoolSize: 100, + connectionTimeout: 30000, }, connectionProvider, database: '', diff --git a/packages/neo4j-driver/test/internal/channel-config.test.js b/packages/neo4j-driver/test/internal/channel-config.test.js index 1d75b8c25..d15e863bc 100644 --- a/packages/neo4j-driver/test/internal/channel-config.test.js +++ b/packages/neo4j-driver/test/internal/channel-config.test.js @@ -111,28 +111,10 @@ describe('#unit ChannelConfig', () => { expect(config.connectionErrorCode).toEqual(SERVICE_UNAVAILABLE) }) - it('should have connection timeout by default', () => { + it('should have connection timeout being used as it is', () => { const config = new ChannelConfig(null, {}, '') - expect(config.connectionTimeout).toEqual(30000) - }) - - it('should respect configured connection timeout', () => { - const config = new ChannelConfig(null, { connectionTimeout: 424242 }, '') - - expect(config.connectionTimeout).toEqual(424242) - }) - - it('should respect disabled connection timeout with value zero', () => { - const config = new ChannelConfig(null, { connectionTimeout: 0 }, '') - - expect(config.connectionTimeout).toBeNull() - }) - - it('should respect disabled connection timeout with negative value', () => { - const config = new ChannelConfig(null, { connectionTimeout: -42 }, '') - - expect(config.connectionTimeout).toBeNull() + expect(config.connectionTimeout).toEqual(undefined) }) it('should validate value of "encrypted" property', () => { diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 53e9bcd82..a09efd98b 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -383,7 +383,7 @@ export function GetFeatures (_context, _params, wire) { 'Feature:Bolt:4.4', 'Feature:API:Result.List', 'Feature:API:Result.Peek', - 'Feature:Configuruation:ConnectionAcquisitionTimeout', + 'Feature:Configuration:ConnectionAcquisitionTimeout', 'Optimization:EagerTransactionBegin', 'Optimization:ImplicitDefaultArguments', 'Optimization:PullPipelining', From 06f3a522cf669ba94d3dc83b5ef8a9d826ee2972 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 15 Feb 2022 20:25:09 +0100 Subject: [PATCH 3/3] Connection Acquisition Timeout set to zero will imply to always failing since. --- packages/bolt-connection/src/pool/pool.js | 8 +++++--- packages/neo4j-driver/test/driver.test.js | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index c63dcdca5..14b38400a 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -336,10 +336,12 @@ class Pool { } else { // failed to acquire a valid resource from the pool // return the pending request back to the pool - if (!this._acquireRequests[key]) { - this._acquireRequests[key] = [] + if (!pendingRequest.isCompleted()) { + if (!this._acquireRequests[key]) { + this._acquireRequests[key] = [] + } + this._acquireRequests[key].unshift(pendingRequest) } - this._acquireRequests[key].unshift(pendingRequest) } }) } else { diff --git a/packages/neo4j-driver/test/driver.test.js b/packages/neo4j-driver/test/driver.test.js index 86282e9bf..c62b897d1 100644 --- a/packages/neo4j-driver/test/driver.test.js +++ b/packages/neo4j-driver/test/driver.test.js @@ -170,7 +170,7 @@ describe('#integration driver', () => { // Given const config = { maxConnectionPoolSize: 2, - connectionAcquisitionTimeout: 0, + connectionAcquisitionTimeout: 1000, encrypted: false } driver = neo4j.driver(