Skip to content

Make connection acquisition timeout also consider the connection creation time #877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions packages/bolt-connection/src/channel/channel-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const {

const { SERVICE_UNAVAILABLE } = error

const DEFAULT_CONNECTION_TIMEOUT_MILLIS = 30000 // 30 seconds by default

const ALLOWED_VALUES_ENCRYPTED = [
null,
undefined,
Expand Down Expand Up @@ -58,7 +56,7 @@ export default class ChannelConfig {
this.trustedCertificates = extractTrustedCertificates(driverConfig)
this.knownHostsPath = extractKnownHostsPath(driverConfig)
this.connectionErrorCode = connectionErrorCode || SERVICE_UNAVAILABLE
this.connectionTimeout = extractConnectionTimeout(driverConfig)
this.connectionTimeout = driverConfig.connectionTimeout
}
}

Expand Down Expand Up @@ -90,19 +88,3 @@ function extractKnownHostsPath (driverConfig) {
return driverConfig.knownHosts || null
}

function extractConnectionTimeout (driverConfig) {
const configuredTimeout = parseInt(driverConfig.connectionTimeout, 10)
if (configuredTimeout === 0) {
// timeout explicitly configured to 0
return null
} else if (configuredTimeout && configuredTimeout < 0) {
// timeout explicitly configured to a negative value
return null
} else if (!configuredTimeout) {
// timeout not configured, use default value
return DEFAULT_CONNECTION_TIMEOUT_MILLIS
} else {
// timeout configured, use the provided value
return configuredTimeout
}
}
85 changes: 44 additions & 41 deletions packages/bolt-connection/src/pool/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,51 +74,45 @@ class Pool {
* @return {Object} resource that is ready to use.
*/
acquire (address) {
return this._acquire(address).then(resource => {
const key = address.asKey()
const key = address.asKey()

if (resource) {
// New or existing resource acquired
return resource
}
// We're out of resources and will try to acquire later on when an existing resource is released.
const allRequests = this._acquireRequests
const requests = allRequests[key]
if (!requests) {
allRequests[key] = []
}
return new Promise((resolve, reject) => {
let request

// We're out of resources and will try to acquire later on when an existing resource is released.
const allRequests = this._acquireRequests
const requests = allRequests[key]
if (!requests) {
allRequests[key] = []
}
const timeoutId = setTimeout(() => {
// acquisition timeout fired

return new Promise((resolve, reject) => {
let request

const timeoutId = setTimeout(() => {
// acquisition timeout fired

// remove request from the queue of pending requests, if it's still there
// request might've been taken out by the release operation
const pendingRequests = allRequests[key]
if (pendingRequests) {
allRequests[key] = pendingRequests.filter(item => item !== request)
}

if (request.isCompleted()) {
// request already resolved/rejected by the release operation; nothing to do
} else {
// request is still pending and needs to be failed
const activeCount = this.activeResourceCount(address)
const idleCount = this.has(address) ? this._pools[key].length : 0
request.reject(
newError(
`Connection acquisition timed out in ${this._acquisitionTimeout} ms. Pool status: Active conn count = ${activeCount}, Idle conn count = ${idleCount}.`
)
// remove request from the queue of pending requests, if it's still there
// request might've been taken out by the release operation
const pendingRequests = allRequests[key]
if (pendingRequests) {
allRequests[key] = pendingRequests.filter(item => item !== request)
}

if (request.isCompleted()) {
// request already resolved/rejected by the release operation; nothing to do
} else {
// request is still pending and needs to be failed
const activeCount = this.activeResourceCount(address)
const idleCount = this.has(address) ? this._pools[key].length : 0
request.reject(
newError(
`Connection acquisition timed out in ${this._acquisitionTimeout} ms. Pool status: Active conn count = ${activeCount}, Idle conn count = ${idleCount}.`
)
}
}, this._acquisitionTimeout)
)
}
}, this._acquisitionTimeout)

request = new PendingRequest(key, resolve, reject, timeoutId, this._log)
allRequests[key].push(request)
})
request = new PendingRequest(key, resolve, reject, timeoutId, this._log)
allRequests[key].push(request)

this._processPendingAcquireRequests(address)
})
}

Expand Down Expand Up @@ -315,7 +309,7 @@ class Pool {
_processPendingAcquireRequests (address) {
const key = address.asKey()
const requests = this._acquireRequests[key]
const poolState = this._poolState[key]
const poolState = this._poolState[key] || new PoolState()
if (requests) {
const pendingRequest = requests.shift() // pop a pending acquire request

Expand All @@ -339,6 +333,15 @@ class Pool {
// request is still pending and can be resolved with the newly acquired resource
pendingRequest.resolve(resource) // resolve the pending request with the acquired resource
}
} else {
// failed to acquire a valid resource from the pool
// return the pending request back to the pool
if (!pendingRequest.isCompleted()) {
if (!this._acquireRequests[key]) {
this._acquireRequests[key] = []
}
this._acquireRequests[key].unshift(pendingRequest)
}
}
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ describe('NodeChannel', () => {
})
})

function createMockedChannel (connected, config = {}) {
function createMockedChannel (connected, config = { connectionTimeout: 30000 }) {
let endCallback = null
const address = ServerAddress.fromUrl('bolt://localhost:9999')
const channelConfig = new ChannelConfig(address, config, SERVICE_UNAVAILABLE)
Expand Down
41 changes: 41 additions & 0 deletions packages/bolt-connection/test/pool/pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,40 @@ describe('#unit Pool', () => {
expect(resource1.observer).toBeFalsy()
expect(resource2.observer).toBeFalsy()
})

it('should thrown aquisition timeout exception if resource takes longer to be created', async () => {
const address = ServerAddress.fromUrl('bolt://localhost:7687')
const acquisitionTimeout = 1000
let counter = 0

const pool = new Pool({
create: (server, release) =>
new Promise(resolve => setTimeout(
() => resolve(new Resource(server, counter++, release))
, acquisitionTimeout + 10)),
destroy: res => Promise.resolve(),
validate: resourceValidOnlyOnceValidationFunction,
config: new PoolConfig(1, acquisitionTimeout)
})

try {
await pool.acquire(address)
fail('should have thrown')
} catch (e) {
expect(e).toEqual(
newError(
`Connection acquisition timed out in ${acquisitionTimeout} ms. `
+ 'Pool status: Active conn count = 0, Idle conn count = 0.'
)
)

const numberOfIdleResourceAfterResourceGetCreated = await new Promise(resolve =>
setTimeout(() => resolve(idleResources(pool, address)), 11))

expect(numberOfIdleResourceAfterResourceGetCreated).toEqual(1)
expect(counter).toEqual(1)
}
})
})

function expectNoPendingAcquisitionRequests (pool) {
Expand All @@ -895,6 +929,13 @@ function expectNoIdleResources (pool, address) {
}
}

function idleResources (pool, address) {
if (pool.has(address)) {
return pool._pools[address.asKey()].length
}
return undefined
}

function expectNumberOfAcquisitionRequests (pool, address, expectedNumber) {
expect(pool._acquireRequests[address.asKey()].length).toEqual(expectedNumber)
}
Expand Down
40 changes: 37 additions & 3 deletions packages/core/src/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
ACCESS_MODE_READ,
ACCESS_MODE_WRITE,
FETCH_ALL,
DEFAULT_CONNECTION_TIMEOUT_MILLIS,
DEFAULT_POOL_ACQUISITION_TIMEOUT,
DEFAULT_POOL_MAX_SIZE
} from './internal/constants'
Expand Down Expand Up @@ -131,12 +132,15 @@ class Driver {
createSession: CreateSession = args => new Session(args)
) {
sanitizeConfig(config)
validateConfig(config)

const log = Logger.create(config)

validateConfig(config, log)

this._id = idGenerator++
this._meta = meta
this._config = config
this._log = Logger.create(config)
this._log = log;
this._createConnectionProvider = createConnectonProvider
this._createSession = createSession

Expand Down Expand Up @@ -366,13 +370,22 @@ class Driver {
* @private
* @returns {Object} the given config.
*/
function validateConfig(config: any): any {
function validateConfig(config: any, log: Logger): any {
const resolver = config.resolver
if (resolver && typeof resolver !== 'function') {
throw new TypeError(
`Configured resolver should be a function. Got: ${resolver}`
)
}

if (config.connectionAcquisitionTimeout < config.connectionTimeout) {
log.warn(
'Configuration for "connectionAcquisitionTimeout" should be greater than ' +
'or equal to "connectionTimeout". Otherwise, the connection acquisition ' +
'timeout will take precedence for over the connection timeout in scenarios ' +
'where a new connection is created while it is acquired'
)
}
return config
}

Expand All @@ -396,6 +409,7 @@ function sanitizeConfig(config: any) {
config.fetchSize,
DEFAULT_FETCH_SIZE
)
config.connectionTimeout = extractConnectionTimeout(config)
}

/**
Expand Down Expand Up @@ -431,6 +445,26 @@ function validateFetchSizeValue(
}
}

/**
* @private
*/
function extractConnectionTimeout (config: any): number|null {
const configuredTimeout = parseInt(config.connectionTimeout, 10)
if (configuredTimeout === 0) {
// timeout explicitly configured to 0
return null
} else if (configuredTimeout && configuredTimeout < 0) {
// timeout explicitly configured to a negative value
return null
} else if (!configuredTimeout) {
// timeout not configured, use default value
return DEFAULT_CONNECTION_TIMEOUT_MILLIS
} else {
// timeout configured, use the provided value
return configuredTimeout
}
}

/**
* @private
* @returns {ConfiguredCustomResolver} new custom resolver that wraps the passed-in resolver function.
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/internal/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
const FETCH_ALL = -1
const DEFAULT_POOL_ACQUISITION_TIMEOUT = 60 * 1000 // 60 seconds
const DEFAULT_POOL_MAX_SIZE = 100
const DEFAULT_CONNECTION_TIMEOUT_MILLIS = 30000 // 30 seconds by default

const ACCESS_MODE_READ: 'READ' = 'READ'
const ACCESS_MODE_WRITE: 'WRITE' = 'WRITE'
Expand All @@ -37,6 +38,7 @@ export {
FETCH_ALL,
ACCESS_MODE_READ,
ACCESS_MODE_WRITE,
DEFAULT_CONNECTION_TIMEOUT_MILLIS,
DEFAULT_POOL_ACQUISITION_TIMEOUT,
DEFAULT_POOL_MAX_SIZE,
BOLT_PROTOCOL_V1,
Expand Down
40 changes: 40 additions & 0 deletions packages/core/test/driver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Driver from '../src/driver'
import { Bookmarks } from '../src/internal/bookmarks'
import { Logger } from '../src/internal/logger'
import { ConfiguredCustomResolver } from '../src/internal/resolver'
import { LogLevel } from '../src/types'

describe('Driver', () => {
let driver: Driver | null
Expand Down Expand Up @@ -155,6 +156,44 @@ describe('Driver', () => {
expect(driver.isEncrypted()).toEqual(expectedValue)
})

it.each([
[{ connectionTimeout: 30000, connectionAcquisitionTimeout: 60000 }, true],
[{ connectionTimeout: null, connectionAcquisitionTimeout: 60000 }, true],
[{ connectionTimeout: 30000, connectionAcquisitionTimeout: null }, true],
[{ connectionTimeout: null, connectionAcquisitionTimeout: null }, true],
[{ connectionAcquisitionTimeout: 60000 }, true],
[{ connectionTimeout: 30000 }, true],
[{}, true],
[{ connectionTimeout: 30000, connectionAcquisitionTimeout: 20000 }, false],
[{ connectionAcquisitionTimeout: 20000 }, false],
[{ connectionTimeout: 70000 }, false],
// No connection timeouts should be considered valid, since it means
// the user doesn't case about the connection timeout at all.
[{ connectionTimeout: 0, connectionAcquisitionTimeout: 2000 }, true],
[{ connectionTimeout: -1, connectionAcquisitionTimeout: 2000 }, true],
])('should emit warning if `connectionAcquisitionTimeout` and `connectionTimeout` are conflicting. [%o} ', async (config, valid) => {
const logging = {
level: 'warn' as LogLevel,
logger: jest.fn()
}

const driver = new Driver(META_INFO, { ...config, logging }, mockCreateConnectonProvider(new ConnectionProvider()), createSession)

if (valid) {
expect(logging.logger).not.toHaveBeenCalled()
} else {
expect(logging.logger).toHaveBeenCalledWith(
'warn',
'Configuration for "connectionAcquisitionTimeout" should be greater than ' +
'or equal to "connectionTimeout". Otherwise, the connection acquisition ' +
'timeout will take precedence for over the connection timeout in scenarios ' +
'where a new connection is created while it is acquired'
)
}

await driver.close()
})

function mockCreateConnectonProvider(connectionProvider: ConnectionProvider) {
return (
id: number,
Expand All @@ -172,6 +211,7 @@ describe('Driver', () => {
fetchSize: 1000,
maxConnectionLifetime: 3600000,
maxConnectionPoolSize: 100,
connectionTimeout: 30000,
},
connectionProvider,
database: '',
Expand Down
2 changes: 1 addition & 1 deletion packages/neo4j-driver/test/driver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ describe('#integration driver', () => {
// Given
const config = {
maxConnectionPoolSize: 2,
connectionAcquisitionTimeout: 0,
connectionAcquisitionTimeout: 1000,
encrypted: false
}
driver = neo4j.driver(
Expand Down
Loading