From 78b47c93dd79f4cb6cb67c322f612788f7a1b2a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 10:53:11 +0200 Subject: [PATCH 01/10] Initial moving parts --- packages/core/src/internal/index.ts | 4 +- .../src/internal/pool/index.ts} | 0 .../src/internal/pool/pool-config.ts} | 20 +-- .../src/internal/pool/pool.ts} | 121 ++++++++++++------ .../test/internal/pool/pool.test.ts} | 13 +- 5 files changed, 104 insertions(+), 54 deletions(-) rename packages/{bolt-connection/src/pool/index.js => core/src/internal/pool/index.ts} (100%) rename packages/{bolt-connection/src/pool/pool-config.js => core/src/internal/pool/pool-config.ts} (72%) rename packages/{bolt-connection/src/pool/pool.js => core/src/internal/pool/pool.ts} (80%) rename packages/{bolt-connection/test/pool/pool.test.js => core/test/internal/pool/pool.test.ts} (99%) diff --git a/packages/core/src/internal/index.ts b/packages/core/src/internal/index.ts index c07c301e2..92e99730e 100644 --- a/packages/core/src/internal/index.ts +++ b/packages/core/src/internal/index.ts @@ -29,6 +29,7 @@ import * as serverAddress from './server-address' import * as resolver from './resolver' import * as objectUtil from './object-util' import * as boltAgent from './bolt-agent/index' +import * as pool from './pool' export { util, @@ -44,5 +45,6 @@ export { serverAddress, resolver, objectUtil, - boltAgent + boltAgent, + pool } diff --git a/packages/bolt-connection/src/pool/index.js b/packages/core/src/internal/pool/index.ts similarity index 100% rename from packages/bolt-connection/src/pool/index.js rename to packages/core/src/internal/pool/index.ts diff --git a/packages/bolt-connection/src/pool/pool-config.js b/packages/core/src/internal/pool/pool-config.ts similarity index 72% rename from packages/bolt-connection/src/pool/pool-config.js rename to packages/core/src/internal/pool/pool-config.ts index a31e72076..6582f4882 100644 --- a/packages/bolt-connection/src/pool/pool-config.js +++ b/packages/core/src/internal/pool/pool-config.ts @@ -19,7 +19,10 @@ const DEFAULT_MAX_SIZE = 100 const DEFAULT_ACQUISITION_TIMEOUT = 60 * 1000 // 60 seconds export default class PoolConfig { - constructor (maxSize, acquisitionTimeout) { + public readonly maxSize: number + public readonly acquisitionTimeout: number + + constructor (maxSize: number, acquisitionTimeout: number) { this.maxSize = valueOrDefault(maxSize, DEFAULT_MAX_SIZE) this.acquisitionTimeout = valueOrDefault( acquisitionTimeout, @@ -31,15 +34,14 @@ export default class PoolConfig { return new PoolConfig(DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT) } - static fromDriverConfig (config) { - const maxSizeConfigured = isConfigured(config.maxConnectionPoolSize) - const maxSize = maxSizeConfigured + static fromDriverConfig (config: { maxConnectionPoolSize?: number, connectionAcquisitionTimeout?: number} ) { + const maxSize = isConfigured(config.maxConnectionPoolSize) ? config.maxConnectionPoolSize : DEFAULT_MAX_SIZE - const acquisitionTimeoutConfigured = isConfigured( + + const acquisitionTimeout = isConfigured( config.connectionAcquisitionTimeout ) - const acquisitionTimeout = acquisitionTimeoutConfigured ? config.connectionAcquisitionTimeout : DEFAULT_ACQUISITION_TIMEOUT @@ -47,12 +49,12 @@ export default class PoolConfig { } } -function valueOrDefault (value, defaultValue) { +function valueOrDefault (value: number | undefined, defaultValue: number) { return value === 0 || value ? value : defaultValue } -function isConfigured (value) { - return value === 0 || value +function isConfigured (value?: number): value is number { + return value === 0 || value != null } export { DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT } diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/core/src/internal/pool/pool.ts similarity index 80% rename from packages/bolt-connection/src/pool/pool.js rename to packages/core/src/internal/pool/pool.ts index 6d3bd38fb..cb27fb0cd 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/core/src/internal/pool/pool.ts @@ -16,13 +16,47 @@ */ import PoolConfig from './pool-config' -import { newError, internal } from 'neo4j-driver-core' - -const { - logger: { Logger } -} = internal +import { newError } from '../../error' +import { Logger } from '../logger' +import { ServerAddress } from '../server-address' + + +type Release = (address: ServerAddress, resource: unknown) => Promise +type Create = (acquisitionContext: unknown, address: ServerAddress, release: Release) => Promise +type Destroy = (resource: unknown) => Promise +type ValidateOnAcquire = (acquisitionContext: unknown, resource: unknown) => Promise | boolean +type ValidateOnRelease = (resource: unknown) => Promise | boolean +type InstallObserver = (resource: unknown, observer: unknown) => void +type RemoveObserver = (resource: unknown) => void +type AcquisitionConfig = { requireNew?: boolean } + +type ConstructorParam = { + create?: Create + destroy?: Destroy + validateOnAcquire?: ValidateOnAcquire + validateOnRelease?: ValidateOnRelease + installIdleObserver?: InstallObserver + removeIdleObserver?: RemoveObserver + config?: PoolConfig + log?: Logger +} class Pool { + private readonly _create: Create + private readonly _destroy: Destroy + private readonly _validateOnAcquire: ValidateOnAcquire + private readonly _validateOnRelease: ValidateOnRelease + private readonly _installIdleObserver: InstallObserver + private readonly _removeIdleObserver: RemoveObserver + private readonly _maxSize: number + private readonly _acquisitionTimeout: number + private readonly _log: Logger + private readonly _pools: { [key: string]: SingleAddressPool } + private readonly _pendingCreates: { [key: string]: number } + private readonly _acquireRequests: { [key: string]: PendingRequest[] } + private readonly _activeResourceCounts: { [key: string]: number } + private _closed: boolean + /** * @param {function(acquisitionContext: object, address: ServerAddress, function(address: ServerAddress, resource: object): Promise): Promise} create * an allocation function that creates a promise with a new resource. It's given an address for which to @@ -52,7 +86,7 @@ class Pool { removeIdleObserver = conn => {}, config = PoolConfig.defaultConfig(), log = Logger.noOp() - } = {}) { + }: ConstructorParam) { this._create = create this._destroy = destroy this._validateOnAcquire = validateOnAcquire @@ -78,7 +112,7 @@ class Pool { * @param {boolean} config.requireNew Indicate it requires a new resource * @return {Promise} resource that is ready to use. */ - acquire (acquisitionContext, address, config) { + acquire (acquisitionContext: unknown, address: ServerAddress, config?: AcquisitionConfig): Promise { const key = address.asKey() // We're out of resources and will try to acquire later on when an existing resource is released. @@ -88,7 +122,7 @@ class Pool { allRequests[key] = [] } return new Promise((resolve, reject) => { - let request = null + let request: PendingRequest | undefined const timeoutId = setTimeout(() => { // acquisition timeout fired @@ -100,13 +134,13 @@ class Pool { allRequests[key] = pendingRequests.filter(item => item !== request) } - if (request.isCompleted()) { + if (request?.isCompleted()) { // request already resolved/rejected by the release operation; nothing to do } else { // request is still pending and needs to be failed const activeCount = this.activeResourceCount(address) const idleCount = this.has(address) ? this._pools[key].length : 0 - request.reject( + request?.reject( newError( `Connection acquisition timed out in ${this._acquisitionTimeout} ms. Pool status: Active conn count = ${activeCount}, Idle conn count = ${idleCount}.` ) @@ -126,11 +160,11 @@ class Pool { * @param {ServerAddress} address the address of the server to purge its pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - purge (address) { + purge (address: ServerAddress): Promise { return this._purgeKey(address.asKey()) } - apply (address, resourceConsumer) { + apply (address: ServerAddress, resourceConsumer: (resource: unknown) => void): void { const key = address.asKey() if (key in this._pools) { @@ -142,12 +176,12 @@ class Pool { * Destroy all idle resources in this pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - async close () { + async close (): Promise { this._closed = true /** * The lack of Promise consuming was making the driver do not close properly in the scenario * captured at result.test.js:it('should handle missing onCompleted'). The test was timing out - * because while wainting for the driver close. + * because while waiting for the driver close. * * Consuming the Promise.all or by calling then or by awaiting in the result inside this method solved * the issue somehow. @@ -157,19 +191,19 @@ class Pool { */ return await Promise.all( Object.keys(this._pools).map(key => this._purgeKey(key)) - ) + ).then() } /** * Keep the idle resources for the provided addresses and purge the rest. * @returns {Promise} A promise that is resolved when the other resources are purged */ - keepAll (addresses) { + async keepAll (addresses: ServerAddress[]): Promise { const keysToKeep = addresses.map(a => a.asKey()) const keysPresent = Object.keys(this._pools) const keysToPurge = keysPresent.filter(k => keysToKeep.indexOf(k) === -1) - return Promise.all(keysToPurge.map(key => this._purgeKey(key))) + return await Promise.all(keysToPurge.map(key => this._purgeKey(key))).then() } /** @@ -177,7 +211,7 @@ class Pool { * @param {ServerAddress} address the address of the server to check. * @return {boolean} `true` when pool contains entries for the given key, false otherwise. */ - has (address) { + has (address: ServerAddress): boolean { return address.asKey() in this._pools } @@ -186,11 +220,11 @@ class Pool { * @param {ServerAddress} address the address of the server to check. * @return {number} count of resources acquired by clients. */ - activeResourceCount (address) { + activeResourceCount (address: ServerAddress): number { return this._activeResourceCounts[address.asKey()] || 0 } - _getOrInitializePoolFor (key) { + _getOrInitializePoolFor (key: string): SingleAddressPool { let pool = this._pools[key] if (!pool) { pool = new SingleAddressPool() @@ -200,7 +234,7 @@ class Pool { return pool } - async _acquire (acquisitionContext, address, requireNew) { + async _acquire (acquisitionContext: unknown, address: ServerAddress, requireNew: boolean): Promise<{ resource: unknown, pool: SingleAddressPool }> { if (this._closed) { throw newError('Pool is closed, it is no more able to serve requests.') } @@ -269,7 +303,7 @@ class Pool { return { resource, pool } } - async _release (address, resource, pool) { + async _release (address: ServerAddress, resource: unknown, pool: SingleAddressPool): Promise { const key = address.asKey() try { @@ -286,7 +320,7 @@ class Pool { } else { if (this._installIdleObserver) { this._installIdleObserver(resource, { - onError: error => { + onError: (error: Error) => { this._log.debug( `Idle connection ${resource} destroyed because of error: ${error}` ) @@ -324,7 +358,7 @@ class Pool { } } - async _purgeKey (key) { + async _purgeKey (key: string): Promise { const pool = this._pools[key] const destructionList = [] if (pool) { @@ -341,7 +375,7 @@ class Pool { } } - _processPendingAcquireRequests (address) { + _processPendingAcquireRequests (address: ServerAddress): void { const key = address.asKey() const requests = this._acquireRequests[key] if (requests) { @@ -390,7 +424,7 @@ class Pool { * @param {string} key the resource group identifier (server address for connections). * @param {Object.} activeResourceCounts the object holding active counts per key. */ -function resourceAcquired (key, activeResourceCounts) { +function resourceAcquired (key: string, activeResourceCounts: { [key: string]: number }): void { const currentCount = activeResourceCounts[key] || 0 activeResourceCounts[key] = currentCount + 1 } @@ -400,7 +434,7 @@ function resourceAcquired (key, activeResourceCounts) { * @param {string} key the resource group identifier (server address for connections). * @param {Object.} activeResourceCounts the object holding active counts per key. */ -function resourceReleased (key, activeResourceCounts) { +function resourceReleased (key: string, activeResourceCounts: { [key: string]: number }): void { const currentCount = activeResourceCounts[key] || 0 const nextCount = currentCount - 1 @@ -412,7 +446,16 @@ function resourceReleased (key, activeResourceCounts) { } class PendingRequest { - constructor (key, context, config, resolve, reject, timeoutId, log) { + private readonly _key: string + private readonly _context: unknown + private readonly _config: AcquisitionConfig + private readonly _resolve: (resource: unknown) => void + private readonly _reject: (error: Error) => void + private readonly _timeoutId: number + private readonly _log: Logger + private _completed: boolean + + constructor (key: string, context: unknown, config: AcquisitionConfig | undefined, resolve: (resource: unknown) => void, reject: (error: Error) => void, timeoutId: number, log: Logger) { this._key = key this._context = context this._resolve = resolve @@ -435,12 +478,13 @@ class PendingRequest { return this._completed } - resolve (resource) { + resolve (resource: unknown) { if (this._completed) { return } this._completed = true + // @ts-expect-error clearTimeout(this._timeoutId) if (this._log.isDebugEnabled()) { this._log.debug(`${resource} acquired from the pool ${this._key}`) @@ -448,18 +492,23 @@ class PendingRequest { this._resolve(resource) } - reject (error) { + reject (error: Error) { if (this._completed) { return } this._completed = true + // @ts-expect-error clearTimeout(this._timeoutId) this._reject(error) } } class SingleAddressPool { + private _active: boolean + private _elements: unknown[] + private _elementsInUse: Set + constructor () { this._active = true this._elements = [] @@ -476,12 +525,12 @@ class SingleAddressPool { this._elementsInUse = new Set() } - filter (predicate) { + filter (predicate: (resource: unknown) => boolean) { this._elements = this._elements.filter(predicate) return this } - apply (resourceConsumer) { + apply (resourceConsumer: (resource: unknown) => void) { this._elements.forEach(resourceConsumer) this._elementsInUse.forEach(resourceConsumer) } @@ -490,22 +539,22 @@ class SingleAddressPool { return this._elements.length } - pop () { + pop (): unknown { const element = this._elements.pop() this._elementsInUse.add(element) return element } - push (element) { + push (element: unknown): number { this._elementsInUse.delete(element) return this._elements.push(element) } - pushInUse (element) { + pushInUse (element: unknown): void { this._elementsInUse.add(element) } - removeInUse (element) { + removeInUse (element: unknown): void { this._elementsInUse.delete(element) } } diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/core/test/internal/pool/pool.test.ts similarity index 99% rename from packages/bolt-connection/test/pool/pool.test.js rename to packages/core/test/internal/pool/pool.test.ts index 581b7f608..2ce6f0ee5 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/core/test/internal/pool/pool.test.ts @@ -15,13 +15,10 @@ * limitations under the License. */ -import Pool from '../../src/pool/pool' -import PoolConfig from '../../src/pool/pool-config' -import { newError, error, internal } from 'neo4j-driver-core' - -const { - serverAddress: { ServerAddress } -} = internal +import Pool from '../../../src/internal/pool/pool' +import PoolConfig from '../../../src/internal/pool/pool-config' +import { ServerAddress } from '../../../src/internal/server-address' +import { newError, error } from '../../../src' const { SERVICE_UNAVAILABLE } = error @@ -31,7 +28,7 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => + create: (_: unknown, server: ServerAddress, release: (address: ServerAddress, resource: unknown) => Promise ) => Promise.resolve(new Resource(server, counter++, release)) }) From 4e4a2d8d5047c905319389e81fe3e450e6e50709 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 14:55:53 +0200 Subject: [PATCH 02/10] Fix types and code --- packages/core/src/internal/pool/pool.ts | 217 +++++---- packages/core/test/internal/pool/pool.test.ts | 456 +++++++++--------- 2 files changed, 356 insertions(+), 317 deletions(-) diff --git a/packages/core/src/internal/pool/pool.ts b/packages/core/src/internal/pool/pool.ts index cb27fb0cd..5fb447d2b 100644 --- a/packages/core/src/internal/pool/pool.ts +++ b/packages/core/src/internal/pool/pool.ts @@ -20,40 +20,39 @@ import { newError } from '../../error' import { Logger } from '../logger' import { ServerAddress } from '../server-address' - -type Release = (address: ServerAddress, resource: unknown) => Promise -type Create = (acquisitionContext: unknown, address: ServerAddress, release: Release) => Promise -type Destroy = (resource: unknown) => Promise -type ValidateOnAcquire = (acquisitionContext: unknown, resource: unknown) => Promise | boolean -type ValidateOnRelease = (resource: unknown) => Promise | boolean -type InstallObserver = (resource: unknown, observer: unknown) => void -type RemoveObserver = (resource: unknown) => void -type AcquisitionConfig = { requireNew?: boolean } - -type ConstructorParam = { - create?: Create - destroy?: Destroy - validateOnAcquire?: ValidateOnAcquire - validateOnRelease?: ValidateOnRelease - installIdleObserver?: InstallObserver - removeIdleObserver?: RemoveObserver +type Release = (address: ServerAddress, resource: R) => Promise +type Create = (acquisitionContext: unknown, address: ServerAddress, release: Release) => Promise +type Destroy = (resource: R) => Promise +type ValidateOnAcquire = (acquisitionContext: unknown, resource: R) => (Promise | boolean) +type ValidateOnRelease = (resource: R) => (Promise | boolean) +type InstallObserver = (resource: R, observer: unknown) => void +type RemoveObserver = (resource: R) => void +interface AcquisitionConfig { requireNew?: boolean } + +interface ConstructorParam { + create?: Create + destroy?: Destroy + validateOnAcquire?: ValidateOnAcquire + validateOnRelease?: ValidateOnRelease + installIdleObserver?: InstallObserver + removeIdleObserver?: RemoveObserver config?: PoolConfig log?: Logger } -class Pool { - private readonly _create: Create - private readonly _destroy: Destroy - private readonly _validateOnAcquire: ValidateOnAcquire - private readonly _validateOnRelease: ValidateOnRelease - private readonly _installIdleObserver: InstallObserver - private readonly _removeIdleObserver: RemoveObserver +class Pool { + private readonly _create: Create + private readonly _destroy: Destroy + private readonly _validateOnAcquire: ValidateOnAcquire + private readonly _validateOnRelease: ValidateOnRelease + private readonly _installIdleObserver: InstallObserver + private readonly _removeIdleObserver: RemoveObserver private readonly _maxSize: number private readonly _acquisitionTimeout: number private readonly _log: Logger - private readonly _pools: { [key: string]: SingleAddressPool } + private readonly _pools: { [key: string]: SingleAddressPool } private readonly _pendingCreates: { [key: string]: number } - private readonly _acquireRequests: { [key: string]: PendingRequest[] } + private readonly _acquireRequests: { [key: string]: Array> } private readonly _activeResourceCounts: { [key: string]: number } private _closed: boolean @@ -78,15 +77,15 @@ class Pool { * @param {Logger} log the driver logger. */ constructor ({ - create = (acquisitionContext, address, release) => Promise.resolve(), - destroy = conn => Promise.resolve(), + create = async (acquisitionContext, address, release) => await Promise.reject(new Error('Not implemented')), + destroy = async conn => await Promise.resolve(), validateOnAcquire = (acquisitionContext, conn) => true, validateOnRelease = (conn) => true, installIdleObserver = (conn, observer) => {}, removeIdleObserver = conn => {}, config = PoolConfig.defaultConfig(), log = Logger.noOp() - }: ConstructorParam) { + }: ConstructorParam) { this._create = create this._destroy = destroy this._validateOnAcquire = validateOnAcquire @@ -112,35 +111,33 @@ class Pool { * @param {boolean} config.requireNew Indicate it requires a new resource * @return {Promise} resource that is ready to use. */ - acquire (acquisitionContext: unknown, address: ServerAddress, config?: AcquisitionConfig): Promise { + async acquire (acquisitionContext: unknown, address: ServerAddress, config?: AcquisitionConfig): Promise { const key = address.asKey() // We're out of resources and will try to acquire later on when an existing resource is released. const allRequests = this._acquireRequests const requests = allRequests[key] - if (!requests) { + if (requests == null) { allRequests[key] = [] } - return new Promise((resolve, reject) => { - let request: PendingRequest | undefined - + return await new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { // acquisition timeout fired // remove request from the queue of pending requests, if it's still there // request might've been taken out by the release operation const pendingRequests = allRequests[key] - if (pendingRequests) { + if (pendingRequests != null) { allRequests[key] = pendingRequests.filter(item => item !== request) } - if (request?.isCompleted()) { + if (request.isCompleted()) { // request already resolved/rejected by the release operation; nothing to do } else { // request is still pending and needs to be failed const activeCount = this.activeResourceCount(address) const idleCount = this.has(address) ? this._pools[key].length : 0 - request?.reject( + request.reject( newError( `Connection acquisition timed out in ${this._acquisitionTimeout} ms. Pool status: Active conn count = ${activeCount}, Idle conn count = ${idleCount}.` ) @@ -149,7 +146,7 @@ class Pool { }, this._acquisitionTimeout) typeof timeoutId === 'object' && timeoutId.unref() - request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) + const request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) allRequests[key].push(request) this._processPendingAcquireRequests(address) }) @@ -160,11 +157,11 @@ class Pool { * @param {ServerAddress} address the address of the server to purge its pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - purge (address: ServerAddress): Promise { - return this._purgeKey(address.asKey()) + async purge (address: ServerAddress): Promise { + return await this._purgeKey(address.asKey()) } - apply (address: ServerAddress, resourceConsumer: (resource: unknown) => void): void { + apply (address: ServerAddress, resourceConsumer: (resource: R) => void): void { const key = address.asKey() if (key in this._pools) { @@ -190,7 +187,7 @@ class Pool { * seems to be need also. */ return await Promise.all( - Object.keys(this._pools).map(key => this._purgeKey(key)) + Object.keys(this._pools).map(async key => await this._purgeKey(key)) ).then() } @@ -201,9 +198,9 @@ class Pool { async keepAll (addresses: ServerAddress[]): Promise { const keysToKeep = addresses.map(a => a.asKey()) const keysPresent = Object.keys(this._pools) - const keysToPurge = keysPresent.filter(k => keysToKeep.indexOf(k) === -1) + const keysToPurge = keysPresent.filter(k => !keysToKeep.includes(k)) - return await Promise.all(keysToPurge.map(key => this._purgeKey(key))).then() + return await Promise.all(keysToPurge.map(async key => await this._purgeKey(key))).then() } /** @@ -221,20 +218,20 @@ class Pool { * @return {number} count of resources acquired by clients. */ activeResourceCount (address: ServerAddress): number { - return this._activeResourceCounts[address.asKey()] || 0 + return this._activeResourceCounts[address.asKey()] ?? 0 } - _getOrInitializePoolFor (key: string): SingleAddressPool { + _getOrInitializePoolFor (key: string): SingleAddressPool { let pool = this._pools[key] - if (!pool) { - pool = new SingleAddressPool() + if (pool == null) { + pool = new SingleAddressPool() this._pools[key] = pool this._pendingCreates[key] = 0 } return pool } - async _acquire (acquisitionContext: unknown, address: ServerAddress, requireNew: boolean): Promise<{ resource: unknown, pool: SingleAddressPool }> { + async _acquire (acquisitionContext: unknown, address: ServerAddress, requireNew: boolean): Promise<{ resource: R | null, pool: SingleAddressPool }> { if (this._closed) { throw newError('Pool is closed, it is no more able to serve requests.') } @@ -242,10 +239,14 @@ class Pool { const key = address.asKey() const pool = this._getOrInitializePoolFor(key) if (!requireNew) { - while (pool.length) { + while (pool.length > 0) { const resource = pool.pop() - if (this._removeIdleObserver) { + if (resource == null) { + continue + } + + if (this._removeIdleObserver != null) { this._removeIdleObserver(resource) } @@ -253,6 +254,7 @@ class Pool { // idle resource is valid and can be acquired resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} acquired from the pool ${key}`) } return { resource, pool } @@ -283,18 +285,21 @@ class Pool { const numConnections = this.activeResourceCount(address) + pool.length if (numConnections >= this._maxSize && requireNew) { const resource = pool.pop() - if (this._removeIdleObserver) { - this._removeIdleObserver(resource) + if (resource != null) { + if (this._removeIdleObserver != null) { + this._removeIdleObserver(resource) + } + pool.removeInUse(resource) + await this._destroy(resource) } - pool.removeInUse(resource) - await this._destroy(resource) } // Invoke callback that creates actual connection - resource = await this._create(acquisitionContext, address, (address, resource) => this._release(address, resource, pool)) + resource = await this._create(acquisitionContext, address, async (address, resource) => await this._release(address, resource, pool)) pool.pushInUse(resource) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} created for the pool ${key}`) } } finally { @@ -303,7 +308,7 @@ class Pool { return { resource, pool } } - async _release (address: ServerAddress, resource: unknown, pool: SingleAddressPool): Promise { + async _release (address: ServerAddress, resource: R, pool: SingleAddressPool): Promise { const key = address.asKey() try { @@ -312,20 +317,22 @@ class Pool { if (!await this._validateOnRelease(resource)) { if (this._log.isDebugEnabled()) { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `${resource} destroyed and can't be released to the pool ${key} because it is not functional` ) } pool.removeInUse(resource) await this._destroy(resource) } else { - if (this._installIdleObserver) { + if (this._installIdleObserver != null) { this._installIdleObserver(resource, { onError: (error: Error) => { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `Idle connection ${resource} destroyed because of error: ${error}` ) const pool = this._pools[key] - if (pool) { + if (pool != null) { this._pools[key] = pool.filter(r => r !== resource) pool.removeInUse(resource) } @@ -338,6 +345,7 @@ class Pool { } pool.push(resource) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} released to the pool ${key}`) } } @@ -345,6 +353,7 @@ class Pool { // key has been purged, don't put it back, just destroy the resource if (this._log.isDebugEnabled()) { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` ) } @@ -361,15 +370,20 @@ class Pool { async _purgeKey (key: string): Promise { const pool = this._pools[key] const destructionList = [] - if (pool) { - while (pool.length) { + if (pool != null) { + while (pool.length > 0) { const resource = pool.pop() - if (this._removeIdleObserver) { + if (resource == null) { + continue + } + + if (this._removeIdleObserver != null) { this._removeIdleObserver(resource) } destructionList.push(this._destroy(resource)) } pool.close() + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete this._pools[key] await Promise.all(destructionList) } @@ -378,25 +392,33 @@ class Pool { _processPendingAcquireRequests (address: ServerAddress): void { const key = address.asKey() const requests = this._acquireRequests[key] - if (requests) { + if (requests != null) { const pendingRequest = requests.shift() // pop a pending acquire request - if (pendingRequest) { + if (pendingRequest != null) { this._acquire(pendingRequest.context, address, pendingRequest.requireNew) .catch(error => { // failed to acquire/create a new connection to resolve the pending acquire request // propagate the error by failing the pending request pendingRequest.reject(error) - return { resource: null } + return { resource: null, pool: null } }) .then(({ resource, pool }) => { - if (resource) { + // there is not situation where the pool resource is not null and the + // pool is null. + if (resource != null && pool != null) { // managed to acquire a valid resource from the pool if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool this._release(address, resource, pool) + .catch(error => { + if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this._log.debug(`${resource} could not be release back to the pool. Cause: ${error}`) + } + }) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -405,14 +427,15 @@ class Pool { // failed to acquire a valid resource from the pool // return the pending request back to the pool if (!pendingRequest.isCompleted()) { - if (!this._acquireRequests[key]) { + if (this._acquireRequests[key] == null) { this._acquireRequests[key] = [] } this._acquireRequests[key].unshift(pendingRequest) } } - }) + }).catch(error => pendingRequest.reject(error)) } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete this._acquireRequests[key] } } @@ -425,7 +448,7 @@ class Pool { * @param {Object.} activeResourceCounts the object holding active counts per key. */ function resourceAcquired (key: string, activeResourceCounts: { [key: string]: number }): void { - const currentCount = activeResourceCounts[key] || 0 + const currentCount = activeResourceCounts[key] ?? 0 activeResourceCounts[key] = currentCount + 1 } @@ -435,27 +458,28 @@ function resourceAcquired (key: string, activeResourceCounts: { [key: string]: n * @param {Object.} activeResourceCounts the object holding active counts per key. */ function resourceReleased (key: string, activeResourceCounts: { [key: string]: number }): void { - const currentCount = activeResourceCounts[key] || 0 + const currentCount = activeResourceCounts[key] ?? 0 const nextCount = currentCount - 1 if (nextCount > 0) { activeResourceCounts[key] = nextCount } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete activeResourceCounts[key] } } -class PendingRequest { +class PendingRequest { private readonly _key: string private readonly _context: unknown private readonly _config: AcquisitionConfig - private readonly _resolve: (resource: unknown) => void + private readonly _resolve: (resource: R) => void private readonly _reject: (error: Error) => void - private readonly _timeoutId: number + private readonly _timeoutId: any private readonly _log: Logger private _completed: boolean - constructor (key: string, context: unknown, config: AcquisitionConfig | undefined, resolve: (resource: unknown) => void, reject: (error: Error) => void, timeoutId: number, log: Logger) { + constructor (key: string, context: unknown, config: AcquisitionConfig | undefined, resolve: (resource: R) => void, reject: (error: Error) => void, timeoutId: any, log: Logger) { this._key = key this._context = context this._resolve = resolve @@ -463,51 +487,50 @@ class PendingRequest { this._timeoutId = timeoutId this._log = log this._completed = false - this._config = config || {} + this._config = config ?? {} } - get context () { + get context (): unknown { return this._context } - get requireNew () { - return this._config.requireNew || false + get requireNew (): boolean { + return this._config.requireNew ?? false } - isCompleted () { + isCompleted (): boolean { return this._completed } - resolve (resource: unknown) { + resolve (resource: R): void { if (this._completed) { return } this._completed = true - // @ts-expect-error clearTimeout(this._timeoutId) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} acquired from the pool ${this._key}`) } this._resolve(resource) } - reject (error: Error) { + reject (error: Error): void { if (this._completed) { return } this._completed = true - // @ts-expect-error clearTimeout(this._timeoutId) this._reject(error) } } -class SingleAddressPool { +class SingleAddressPool { private _active: boolean - private _elements: unknown[] - private _elementsInUse: Set + private _elements: R[] + private _elementsInUse: Set constructor () { this._active = true @@ -515,46 +538,48 @@ class SingleAddressPool { this._elementsInUse = new Set() } - isActive () { + isActive (): boolean { return this._active } - close () { + close (): void { this._active = false this._elements = [] this._elementsInUse = new Set() } - filter (predicate: (resource: unknown) => boolean) { + filter (predicate: (resource: R) => boolean): SingleAddressPool { this._elements = this._elements.filter(predicate) return this } - apply (resourceConsumer: (resource: unknown) => void) { + apply (resourceConsumer: (resource: R) => void): void { this._elements.forEach(resourceConsumer) this._elementsInUse.forEach(resourceConsumer) } - get length () { + get length (): number { return this._elements.length } - pop (): unknown { + pop (): R | undefined { const element = this._elements.pop() - this._elementsInUse.add(element) + if (element != null) { + this._elementsInUse.add(element) + } return element } - push (element: unknown): number { + push (element: R): number { this._elementsInUse.delete(element) return this._elements.push(element) } - pushInUse (element: unknown): void { + pushInUse (element: R): void { this._elementsInUse.add(element) } - removeInUse (element: unknown): void { + removeInUse (element: R): void { this._elementsInUse.delete(element) } } diff --git a/packages/core/test/internal/pool/pool.test.ts b/packages/core/test/internal/pool/pool.test.ts index 2ce6f0ee5..60d6e01cb 100644 --- a/packages/core/test/internal/pool/pool.test.ts +++ b/packages/core/test/internal/pool/pool.test.ts @@ -27,9 +27,9 @@ describe('#unit Pool', () => { // Given let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_: unknown, server: ServerAddress, release: (address: ServerAddress, resource: unknown) => Promise ) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_: unknown, server: ServerAddress, release: (address: ServerAddress, resource: Resource) => Promise) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -46,9 +46,9 @@ describe('#unit Pool', () => { // Given a pool that allocates let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -68,9 +68,9 @@ describe('#unit Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -94,14 +94,14 @@ describe('#unit Pool', () => { it('frees if validate returns false', async () => { // Given a pool that allocates let counter = 0 - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, validateOnRelease: res => false, config: new PoolConfig(1000, 60000) @@ -123,14 +123,14 @@ describe('#unit Pool', () => { it('should release resources and process acquisitions when destroy connection', async () => { // Given a pool that allocates let counter = 0 - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, validateOnRelease: res => false, config: new PoolConfig(2, 10000) @@ -169,14 +169,14 @@ describe('#unit Pool', () => { // Given a pool that allocates let counter = 0 const theMadeUpError = new Error('I made this error for testing') - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.reject(theMadeUpError) + return await Promise.reject(theMadeUpError) }, validateOnRelease: res => false, config: new PoolConfig(2, 3000) @@ -215,14 +215,14 @@ describe('#unit Pool', () => { // Given a pool that allocates let counter = 0 const theMadeUpError = new Error('I made this error for testing') - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.reject(theMadeUpError) + return await Promise.reject(theMadeUpError) }, validateOnRelease: res => true, config: new PoolConfig(2, 3000) @@ -263,16 +263,16 @@ describe('#unit Pool', () => { it('frees if validateOnRelease returns Promise.resolve(false)', async () => { // Given a pool that allocates let counter = 0 - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, - validateOnRelease: res => Promise.resolve(false), + validateOnRelease: async res => await Promise.resolve(false), config: new PoolConfig(1000, 60000) }) @@ -294,14 +294,14 @@ describe('#unit Pool', () => { let counter = 0 const destroyed = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, - validateOnRelease: res => Promise.resolve(true), + validateOnRelease: async res => await Promise.resolve(true), config: new PoolConfig(1000, 60000) }) @@ -319,16 +319,16 @@ describe('#unit Pool', () => { it('frees if validateOnAcquire returns Promise.resolve(false)', async () => { // Given a pool that allocates let counter = 0 - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, - validateOnAcquire: res => Promise.resolve(false), + validateOnAcquire: async res => await Promise.resolve(false), config: new PoolConfig(1000, 60000) }) @@ -354,14 +354,14 @@ describe('#unit Pool', () => { let counter = 0 const destroyed = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, - validateOnAcquire: res => Promise.resolve(true), + validateOnAcquire: async res => await Promise.resolve(true), config: new PoolConfig(1000, 60000) }) @@ -385,12 +385,12 @@ describe('#unit Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -425,12 +425,12 @@ describe('#unit Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -470,12 +470,12 @@ describe('#unit Pool', () => { it('destroys resource when key was purged', async () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -495,12 +495,12 @@ describe('#unit Pool', () => { it('destroys resource when pool is purged even if a new pool is created for the same address', async () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -537,12 +537,12 @@ describe('#unit Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -555,7 +555,7 @@ describe('#unit Pool', () => { pool.acquire({}, address3) ] const values = await Promise.all(acquiredResources) - await Promise.all(values.map(resource => resource.close())) + await Promise.all(values.map(async resource => await resource.close())) await pool.close() @@ -565,10 +565,10 @@ describe('#unit Pool', () => { it('should fail to acquire when closed', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 0, release)), - destroy: res => { - return Promise.resolve() + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 0, release)), + destroy: async res => { + return await Promise.resolve() } }) @@ -583,11 +583,11 @@ describe('#unit Pool', () => { it('should fail to acquire when closed with idle connections', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 0, release)), - destroy: res => { - return Promise.resolve() + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 0, release)), + destroy: async res => { + return await Promise.resolve() } }) @@ -610,12 +610,12 @@ describe('#unit Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -647,12 +647,12 @@ describe('#unit Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -681,15 +681,15 @@ describe('#unit Pool', () => { let validated = true let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() }, - validateOnAcquire: (context, _res) => { - if (context.triggerValidation) { + validateOnAcquire: (context: any, _res) => { + if (context.triggerValidation === true) { validated = !validated return validated } @@ -709,8 +709,8 @@ describe('#unit Pool', () => { const absentAddress = ServerAddress.fromUrl('bolt://localhost:7688') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) await pool.acquire({}, existingAddress) @@ -722,8 +722,8 @@ describe('#unit Pool', () => { it('reports zero active resources when empty', () => { const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) expect( @@ -740,8 +740,8 @@ describe('#unit Pool', () => { it('reports active resources', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) const acquiredResources = [ @@ -758,16 +758,16 @@ describe('#unit Pool', () => { it('reports active resources when they are acquired', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) // three new resources are created and returned to the pool const r0 = await pool.acquire({}, address) const r1 = await pool.acquire({}, address) const r2 = await pool.acquire({}, address) - await [r0, r1, r2].map(v => v.close()) + await [r0, r1, r2].map(async v => await v.close()) // three idle resources are acquired from the pool const acquiredResources = [ @@ -786,9 +786,9 @@ describe('#unit Pool', () => { it('does not report resources that are returned to the pool', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) const r0 = await pool.acquire({}, address) @@ -816,10 +816,10 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, 5000) }) @@ -828,7 +828,7 @@ describe('#unit Pool', () => { setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - r1.close() + ignore(r1.close()) }, 1000) const r2 = await pool.acquire({}, address) @@ -840,9 +840,9 @@ describe('#unit Pool', () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, 1000) }) @@ -858,14 +858,14 @@ describe('#unit Pool', () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') it('should consider pending connects when evaluating max pool size', async () => { - const conns = [] + const conns: any[] = [] const pool = new Pool({ // Hook into connection creation to track when and what connections that are // created. - create: (_, server, release) => { + create: async (_, server, release) => { // Create a fake connection that makes it possible control when it's connected // and released from the outer scope. - const conn = { + const conn: any = { server, release } @@ -876,7 +876,7 @@ describe('#unit Pool', () => { // Put the connection in a list in outer scope even though there only should be // one when the test is succeeding. conns.push(conn) - return promise + return await promise }, // Setup pool to only allow one connection config: new PoolConfig(1, 100000) @@ -908,10 +908,10 @@ describe('#unit Pool', () => { it('should not time out if max pool size is not set', async () => { let counter = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve() + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve() }) await pool.acquire({}, address) @@ -927,10 +927,10 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, acquisitionTimeout) }) @@ -944,8 +944,8 @@ describe('#unit Pool', () => { // double-release used to cause deletion of acquire requests in the pool and failure of the timeout // such background failure made this test fail, not the existing assertions setTimeout(() => { - resource1.close() - resource2.close() + ignore(resource1.close()) + ignore(resource2.close()) }, acquisitionTimeout) // Remember that both code paths are ok with this test, either a success with a valid resource @@ -969,10 +969,10 @@ describe('#unit Pool', () => { const acquisitionTimeout = 1000 let counter = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), validateOnAcquire: (_, res) => resourceValidOnlyOnceValidationFunction(res), validateOnRelease: resourceValidOnlyOnceValidationFunction, config: new PoolConfig(1, acquisitionTimeout) @@ -985,7 +985,7 @@ describe('#unit Pool', () => { // release the resource before the acquisition timeout, it should be treated as invalid setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - resource1.close() + ignore(resource1.close()) }, acquisitionTimeout / 2) const resource2 = await pool.acquire({}, address) @@ -999,10 +999,10 @@ describe('#unit Pool', () => { const acquisitionTimeout = 1000 let counter = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), validateOnAcquire: (_, res) => resourceValidOnlyOnceValidationFunction(res), validateOnRelease: resourceValidOnlyOnceValidationFunction, config: new PoolConfig(2, acquisitionTimeout) @@ -1019,8 +1019,8 @@ describe('#unit Pool', () => { // release both resources before the acquisition timeout, they should be treated as invalid setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - resource1.close() - resource2.close() + ignore(resource1.close()) + ignore(resource2.close()) }, acquisitionTimeout / 2) const resource3 = await pool.acquire({}, address) @@ -1035,10 +1035,10 @@ describe('#unit Pool', () => { let installIdleObserverCount = 0 let removeIdleObserverCount = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, resourceCount++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: async res => await Promise.resolve(), installIdleObserver: (resource, observer) => { installIdleObserverCount++ }, @@ -1050,7 +1050,7 @@ describe('#unit Pool', () => { const r1 = await pool.acquire({}, address) const r2 = await pool.acquire({}, address) const r3 = await pool.acquire({}, address) - await [r1, r2, r3].map(r => r.close()) + await [r1, r2, r3].map(async r => await r.close()) expect(installIdleObserverCount).toEqual(3) expect(removeIdleObserverCount).toEqual(0) @@ -1067,10 +1067,10 @@ describe('#unit Pool', () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') let resourceCount = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, resourceCount++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: async res => await Promise.resolve(), installIdleObserver: (resource, observer) => { resource.observer = observer }, @@ -1106,10 +1106,10 @@ describe('#unit Pool', () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') let resourceCount = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, resourceCount++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: async res => await Promise.resolve(), installIdleObserver: (resource, observer) => { resource.observer = observer }, @@ -1134,12 +1134,12 @@ describe('#unit Pool', () => { const acquisitionTimeout = 1000 let counter = 0 - const pool = new Pool({ - create: (_, server, release) => - new Promise(resolve => setTimeout( + const pool = new Pool({ + create: async (_, server, release) => + await new Promise(resolve => setTimeout( () => resolve(new Resource(server, counter++, release)) , acquisitionTimeout + 10)), - destroy: res => Promise.resolve(), + destroy: async res => await Promise.resolve(), validateOnAcquire: (_, res) => resourceValidOnlyOnceValidationFunction(res), validateOnRelease: resourceValidOnlyOnceValidationFunction, config: new PoolConfig(1, acquisitionTimeout) @@ -1167,24 +1167,24 @@ describe('#unit Pool', () => { it('should purge resources in parallel', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') let resourceCount = 0 - const resourcesReleased = [] - let resolveRelease + const resourcesReleased: Resource[] = [] + let resolveRelease: (r: Resource) => void const releasePromise = new Promise((resolve) => { resolveRelease = resolve }) const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, resourceCount++, release)), - destroy: res => { + create: async (_, server, release) => + await Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: async (res: Resource) => { resourcesReleased.push(res) resourceCount-- // Only destroy when the last resource // get destroyed if (resourceCount === 0) { - resolveRelease() + resolveRelease(res) } - return releasePromise + return await releasePromise.then() } }) @@ -1205,9 +1205,9 @@ describe('#unit Pool', () => { // Given let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -1224,9 +1224,9 @@ describe('#unit Pool', () => { // Given a pool that allocates let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -1244,10 +1244,10 @@ describe('#unit Pool', () => { it('should fail to acquire when closed', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 0, release)), - destroy: res => { - return Promise.resolve() + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 0, release)), + destroy: async res => { + return await Promise.resolve() } }) @@ -1262,11 +1262,11 @@ describe('#unit Pool', () => { it('should fail to acquire when closed with idle connections', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 0, release)), - destroy: res => { - return Promise.resolve() + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 0, release)), + destroy: async res => { + return await Promise.resolve() } }) @@ -1286,10 +1286,10 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, 5000) }) @@ -1298,7 +1298,7 @@ describe('#unit Pool', () => { setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - r1.close() + ignore(r1.close()) }, 1000) expect(r1).not.toBe(r0) @@ -1310,10 +1310,10 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, 5000) }) @@ -1322,7 +1322,7 @@ describe('#unit Pool', () => { setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - r1.close() + ignore(r1.close()) }, 1000) expect(r1).not.toBe(r0) @@ -1333,12 +1333,12 @@ describe('#unit Pool', () => { it('should handle a sequence of request new and the regular request', async () => { let counter = 0 - const destroy = jest.fn(res => Promise.resolve()) + const destroy = jest.fn(async res => await Promise.resolve()) const removeIdleObserver = jest.fn(res => undefined) const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), destroy, removeIdleObserver, config: new PoolConfig(1, 5000) @@ -1391,9 +1391,10 @@ describe('#unit Pool', () => { }) }) -function expectNoPendingAcquisitionRequests (pool) { +function expectNoPendingAcquisitionRequests (pool: Pool): void { + // @ts-expect-error const acquireRequests = pool._acquireRequests - Object.values(acquireRequests).forEach(requests => { + Object.values(acquireRequests).forEach((requests: any) => { if (Array.isArray(requests) && requests.length === 0) { requests = undefined } @@ -1401,33 +1402,37 @@ function expectNoPendingAcquisitionRequests (pool) { }) } -function expectNoIdleResources (pool, address) { +function expectNoIdleResources (pool: Pool, address: ServerAddress): void { if (pool.has(address)) { + // @ts-expect-error expect(pool._pools[address.asKey()].length).toBe(0) } } -function idleResources (pool, address) { +function idleResources (pool: Pool, address: ServerAddress): number | undefined { if (pool.has(address)) { + // @ts-expect-error return pool._pools[address.asKey()].length } return undefined } -function resourceInUse (pool, address) { +function resourceInUse (pool: Pool, address: ServerAddress): number | undefined { if (pool.has(address)) { + // @ts-expect-error return pool._pools[address.asKey()]._elementsInUse.size } return undefined } -function expectNumberOfAcquisitionRequests (pool, address, expectedNumber) { +function expectNumberOfAcquisitionRequests (pool: Pool, address: ServerAddress, expectedNumber: number): void { + // @ts-expect-error expect(pool._acquireRequests[address.asKey()].length).toEqual(expectedNumber) } -function resourceValidOnlyOnceValidationFunction (resource) { +function resourceValidOnlyOnceValidationFunction (resource: Resource): boolean { // all resources are valid only once - if (resource.validatedOnce) { + if (resource.validatedOnce === true) { return false } else { resource.validatedOnce = true @@ -1435,15 +1440,24 @@ function resourceValidOnlyOnceValidationFunction (resource) { } } +function ignore (value: T | Promise): void { + Promise.resolve(value).catch(e => console.error('Error ignore, should not happen', e)) +} + class Resource { - constructor (key, id, release) { - this.id = id - this.key = key + public destroyed: boolean + public observer?: any + public validatedOnce?: boolean + + constructor ( + public key: ServerAddress, + public id: number, + public release: (key: ServerAddress, r: Resource) => (Promise | void)) { this.release = release this.destroyed = false } - close () { + close (): Promise | void { return this.release(this.key, this) } } From b0fae3443e838bf9884c12beb54e66f867e22000 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 15:20:25 +0200 Subject: [PATCH 03/10] Fixing imports --- .../connection-provider/connection-provider-pooled.js | 9 +++++++-- packages/bolt-connection/src/index.js | 1 - .../connection-provider-direct.test.js | 4 ++-- .../connection-provider-routing.test.js | 4 ++-- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index a85809eff..b8143bfac 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -16,8 +16,7 @@ */ import { createChannelConnection, ConnectionErrorHandler } from '../connection' -import Pool, { PoolConfig } from '../pool' -import { error, ConnectionProvider, ServerInfo, newError } from 'neo4j-driver-core' +import { error, ConnectionProvider, ServerInfo, newError, internal } from 'neo4j-driver-core' import AuthenticationProvider from './authentication-provider' import { object } from '../lang' import LivenessCheckProvider from './liveness-check-provider' @@ -31,6 +30,12 @@ const AUTHENTICATION_ERRORS = [ 'Neo.ClientError.Security.Unauthorized' ] +const { + pool: { + Pool, PoolConfig + } +} = internal + export default class PooledConnectionProvider extends ConnectionProvider { constructor ( { id, config, log, userAgent, boltAgent, authTokenManager, newPool = (...args) => new Pool(...args) }, diff --git a/packages/bolt-connection/src/index.js b/packages/bolt-connection/src/index.js index 0262225b6..d27358e1f 100644 --- a/packages/bolt-connection/src/index.js +++ b/packages/bolt-connection/src/index.js @@ -20,6 +20,5 @@ export * as bolt from './bolt' export * as buf from './buf' export * as channel from './channel' export * as packstream from './packstream' -export * as pool from './pool' export * from './connection-provider' diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index ff2f10ee0..83cb42519 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -16,7 +16,6 @@ */ import DirectConnectionProvider from '../../src/connection-provider/connection-provider-direct' -import { Pool } from '../../src/pool' import { Connection, DelegateConnection } from '../../src/connection' import { authTokenManagers, internal, newError, ServerInfo, staticAuthTokenManager } from 'neo4j-driver-core' import AuthenticationProvider from '../../src/connection-provider/authentication-provider' @@ -25,7 +24,8 @@ import LivenessCheckProvider from '../../src/connection-provider/liveness-check- const { serverAddress: { ServerAddress }, - logger: { Logger } + logger: { Logger }, + pool: { Pool } } = internal describe('#unit DirectConnectionProvider', () => { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index c1eeeb794..297e87d78 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -27,7 +27,6 @@ import { authTokenManagers } from 'neo4j-driver-core' import { RoutingTable } from '../../src/rediscovery/' -import { Pool } from '../../src/pool' import SimpleHostNameResolver from '../../src/channel/browser/browser-host-name-resolver' import RoutingConnectionProvider from '../../src/connection-provider/connection-provider-routing' import { DelegateConnection, Connection } from '../../src/connection' @@ -37,7 +36,8 @@ import LivenessCheckProvider from '../../src/connection-provider/liveness-check- const { serverAddress: { ServerAddress }, - logger: { Logger } + logger: { Logger }, + pool: { Pool } } = internal const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error From 597735a39a87847bf2c3c366aeee1fc5948d1f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 15:21:40 +0200 Subject: [PATCH 04/10] Sync deno --- .../connection-provider-pooled.js | 9 +- .../lib/bolt-connection/index.js | 1 - .../lib/core/internal/index.ts | 4 +- .../index.js => core/internal/pool/index.ts} | 4 +- .../internal/pool/pool-config.ts} | 20 +- .../pool.js => core/internal/pool/pool.ts} | 232 ++++++++++++------ 6 files changed, 176 insertions(+), 94 deletions(-) rename packages/neo4j-driver-deno/lib/{bolt-connection/pool/index.js => core/internal/pool/index.ts} (93%) rename packages/neo4j-driver-deno/lib/{bolt-connection/pool/pool-config.js => core/internal/pool/pool-config.ts} (72%) rename packages/neo4j-driver-deno/lib/{bolt-connection/pool/pool.js => core/internal/pool/pool.ts} (65%) diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js index 04ca8bf66..53902b722 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js @@ -16,8 +16,7 @@ */ import { createChannelConnection, ConnectionErrorHandler } from '../connection/index.js' -import Pool, { PoolConfig } from '../pool/index.js' -import { error, ConnectionProvider, ServerInfo, newError } from '../../core/index.ts' +import { error, ConnectionProvider, ServerInfo, newError, internal } from '../../core/index.ts' import AuthenticationProvider from './authentication-provider.js' import { object } from '../lang/index.js' import LivenessCheckProvider from './liveness-check-provider.js' @@ -31,6 +30,12 @@ const AUTHENTICATION_ERRORS = [ 'Neo.ClientError.Security.Unauthorized' ] +const { + pool: { + Pool, PoolConfig + } +} = internal + export default class PooledConnectionProvider extends ConnectionProvider { constructor ( { id, config, log, userAgent, boltAgent, authTokenManager, newPool = (...args) => new Pool(...args) }, diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/index.js b/packages/neo4j-driver-deno/lib/bolt-connection/index.js index 676118737..fbb9783a5 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/index.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/index.js @@ -20,6 +20,5 @@ export * as bolt from './bolt/index.js' export * as buf from './buf/index.js' export * as channel from './channel/index.js' export * as packstream from './packstream/index.js' -export * as pool from './pool/index.js' export * from './connection-provider/index.js' diff --git a/packages/neo4j-driver-deno/lib/core/internal/index.ts b/packages/neo4j-driver-deno/lib/core/internal/index.ts index 283de1eec..771082496 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/index.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/index.ts @@ -29,6 +29,7 @@ import * as serverAddress from './server-address.ts' import * as resolver from './resolver/index.ts' import * as objectUtil from './object-util.ts' import * as boltAgent from './bolt-agent/index.ts' +import * as pool from './pool/index.ts' export { util, @@ -44,5 +45,6 @@ export { serverAddress, resolver, objectUtil, - boltAgent + boltAgent, + pool } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/pool/index.js b/packages/neo4j-driver-deno/lib/core/internal/pool/index.ts similarity index 93% rename from packages/neo4j-driver-deno/lib/bolt-connection/pool/index.js rename to packages/neo4j-driver-deno/lib/core/internal/pool/index.ts index 3329abda8..b9d918cfd 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/pool/index.js +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/index.ts @@ -18,8 +18,8 @@ import PoolConfig, { DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE -} from './pool-config.js' -import Pool from './pool.js' +} from './pool-config.ts' +import Pool from './pool.ts' export default Pool export { Pool, PoolConfig, DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool-config.js b/packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts similarity index 72% rename from packages/neo4j-driver-deno/lib/bolt-connection/pool/pool-config.js rename to packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts index a31e72076..6582f4882 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool-config.js +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts @@ -19,7 +19,10 @@ const DEFAULT_MAX_SIZE = 100 const DEFAULT_ACQUISITION_TIMEOUT = 60 * 1000 // 60 seconds export default class PoolConfig { - constructor (maxSize, acquisitionTimeout) { + public readonly maxSize: number + public readonly acquisitionTimeout: number + + constructor (maxSize: number, acquisitionTimeout: number) { this.maxSize = valueOrDefault(maxSize, DEFAULT_MAX_SIZE) this.acquisitionTimeout = valueOrDefault( acquisitionTimeout, @@ -31,15 +34,14 @@ export default class PoolConfig { return new PoolConfig(DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT) } - static fromDriverConfig (config) { - const maxSizeConfigured = isConfigured(config.maxConnectionPoolSize) - const maxSize = maxSizeConfigured + static fromDriverConfig (config: { maxConnectionPoolSize?: number, connectionAcquisitionTimeout?: number} ) { + const maxSize = isConfigured(config.maxConnectionPoolSize) ? config.maxConnectionPoolSize : DEFAULT_MAX_SIZE - const acquisitionTimeoutConfigured = isConfigured( + + const acquisitionTimeout = isConfigured( config.connectionAcquisitionTimeout ) - const acquisitionTimeout = acquisitionTimeoutConfigured ? config.connectionAcquisitionTimeout : DEFAULT_ACQUISITION_TIMEOUT @@ -47,12 +49,12 @@ export default class PoolConfig { } } -function valueOrDefault (value, defaultValue) { +function valueOrDefault (value: number | undefined, defaultValue: number) { return value === 0 || value ? value : defaultValue } -function isConfigured (value) { - return value === 0 || value +function isConfigured (value?: number): value is number { + return value === 0 || value != null } export { DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts similarity index 65% rename from packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js rename to packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts index 072fc47e3..d97a32668 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts @@ -15,14 +15,47 @@ * limitations under the License. */ -import PoolConfig from './pool-config.js' -import { newError, internal } from '../../core/index.ts' +import PoolConfig from './pool-config.ts' +import { newError } from '../../error.ts' +import { Logger } from '../logger.ts' +import { ServerAddress } from '../server-address.ts' + +type Release = (address: ServerAddress, resource: R) => Promise +type Create = (acquisitionContext: unknown, address: ServerAddress, release: Release) => Promise +type Destroy = (resource: R) => Promise +type ValidateOnAcquire = (acquisitionContext: unknown, resource: R) => (Promise | boolean) +type ValidateOnRelease = (resource: R) => (Promise | boolean) +type InstallObserver = (resource: R, observer: unknown) => void +type RemoveObserver = (resource: R) => void +interface AcquisitionConfig { requireNew?: boolean } + +interface ConstructorParam { + create?: Create + destroy?: Destroy + validateOnAcquire?: ValidateOnAcquire + validateOnRelease?: ValidateOnRelease + installIdleObserver?: InstallObserver + removeIdleObserver?: RemoveObserver + config?: PoolConfig + log?: Logger +} -const { - logger: { Logger } -} = internal +class Pool { + private readonly _create: Create + private readonly _destroy: Destroy + private readonly _validateOnAcquire: ValidateOnAcquire + private readonly _validateOnRelease: ValidateOnRelease + private readonly _installIdleObserver: InstallObserver + private readonly _removeIdleObserver: RemoveObserver + private readonly _maxSize: number + private readonly _acquisitionTimeout: number + private readonly _log: Logger + private readonly _pools: { [key: string]: SingleAddressPool } + private readonly _pendingCreates: { [key: string]: number } + private readonly _acquireRequests: { [key: string]: Array> } + private readonly _activeResourceCounts: { [key: string]: number } + private _closed: boolean -class Pool { /** * @param {function(acquisitionContext: object, address: ServerAddress, function(address: ServerAddress, resource: object): Promise): Promise} create * an allocation function that creates a promise with a new resource. It's given an address for which to @@ -44,15 +77,15 @@ class Pool { * @param {Logger} log the driver logger. */ constructor ({ - create = (acquisitionContext, address, release) => Promise.resolve(), - destroy = conn => Promise.resolve(), + create = async (acquisitionContext, address, release) => await Promise.reject(new Error('Not implemented')), + destroy = async conn => await Promise.resolve(), validateOnAcquire = (acquisitionContext, conn) => true, validateOnRelease = (conn) => true, installIdleObserver = (conn, observer) => {}, removeIdleObserver = conn => {}, config = PoolConfig.defaultConfig(), log = Logger.noOp() - } = {}) { + }: ConstructorParam) { this._create = create this._destroy = destroy this._validateOnAcquire = validateOnAcquire @@ -78,25 +111,23 @@ class Pool { * @param {boolean} config.requireNew Indicate it requires a new resource * @return {Promise} resource that is ready to use. */ - acquire (acquisitionContext, address, config) { + async acquire (acquisitionContext: unknown, address: ServerAddress, config?: AcquisitionConfig): Promise { const key = address.asKey() // We're out of resources and will try to acquire later on when an existing resource is released. const allRequests = this._acquireRequests const requests = allRequests[key] - if (!requests) { + if (requests == null) { allRequests[key] = [] } - return new Promise((resolve, reject) => { - let request = null - + return await new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { // acquisition timeout fired // remove request from the queue of pending requests, if it's still there // request might've been taken out by the release operation const pendingRequests = allRequests[key] - if (pendingRequests) { + if (pendingRequests != null) { allRequests[key] = pendingRequests.filter(item => item !== request) } @@ -115,7 +146,7 @@ class Pool { }, this._acquisitionTimeout) typeof timeoutId === 'object' && timeoutId.unref() - request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) + const request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) allRequests[key].push(request) this._processPendingAcquireRequests(address) }) @@ -126,11 +157,11 @@ class Pool { * @param {ServerAddress} address the address of the server to purge its pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - purge (address) { - return this._purgeKey(address.asKey()) + async purge (address: ServerAddress): Promise { + return await this._purgeKey(address.asKey()) } - apply (address, resourceConsumer) { + apply (address: ServerAddress, resourceConsumer: (resource: R) => void): void { const key = address.asKey() if (key in this._pools) { @@ -142,12 +173,12 @@ class Pool { * Destroy all idle resources in this pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - async close () { + async close (): Promise { this._closed = true /** * The lack of Promise consuming was making the driver do not close properly in the scenario * captured at result.test.js:it('should handle missing onCompleted'). The test was timing out - * because while wainting for the driver close. + * because while waiting for the driver close. * * Consuming the Promise.all or by calling then or by awaiting in the result inside this method solved * the issue somehow. @@ -156,20 +187,20 @@ class Pool { * seems to be need also. */ return await Promise.all( - Object.keys(this._pools).map(key => this._purgeKey(key)) - ) + Object.keys(this._pools).map(async key => await this._purgeKey(key)) + ).then() } /** * Keep the idle resources for the provided addresses and purge the rest. * @returns {Promise} A promise that is resolved when the other resources are purged */ - keepAll (addresses) { + async keepAll (addresses: ServerAddress[]): Promise { const keysToKeep = addresses.map(a => a.asKey()) const keysPresent = Object.keys(this._pools) - const keysToPurge = keysPresent.filter(k => keysToKeep.indexOf(k) === -1) + const keysToPurge = keysPresent.filter(k => !keysToKeep.includes(k)) - return Promise.all(keysToPurge.map(key => this._purgeKey(key))) + return await Promise.all(keysToPurge.map(async key => await this._purgeKey(key))).then() } /** @@ -177,7 +208,7 @@ class Pool { * @param {ServerAddress} address the address of the server to check. * @return {boolean} `true` when pool contains entries for the given key, false otherwise. */ - has (address) { + has (address: ServerAddress): boolean { return address.asKey() in this._pools } @@ -186,21 +217,21 @@ class Pool { * @param {ServerAddress} address the address of the server to check. * @return {number} count of resources acquired by clients. */ - activeResourceCount (address) { - return this._activeResourceCounts[address.asKey()] || 0 + activeResourceCount (address: ServerAddress): number { + return this._activeResourceCounts[address.asKey()] ?? 0 } - _getOrInitializePoolFor (key) { + _getOrInitializePoolFor (key: string): SingleAddressPool { let pool = this._pools[key] - if (!pool) { - pool = new SingleAddressPool() + if (pool == null) { + pool = new SingleAddressPool() this._pools[key] = pool this._pendingCreates[key] = 0 } return pool } - async _acquire (acquisitionContext, address, requireNew) { + async _acquire (acquisitionContext: unknown, address: ServerAddress, requireNew: boolean): Promise<{ resource: R | null, pool: SingleAddressPool }> { if (this._closed) { throw newError('Pool is closed, it is no more able to serve requests.') } @@ -208,10 +239,14 @@ class Pool { const key = address.asKey() const pool = this._getOrInitializePoolFor(key) if (!requireNew) { - while (pool.length) { + while (pool.length > 0) { const resource = pool.pop() - if (this._removeIdleObserver) { + if (resource == null) { + continue + } + + if (this._removeIdleObserver != null) { this._removeIdleObserver(resource) } @@ -219,6 +254,7 @@ class Pool { // idle resource is valid and can be acquired resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} acquired from the pool ${key}`) } return { resource, pool } @@ -249,18 +285,21 @@ class Pool { const numConnections = this.activeResourceCount(address) + pool.length if (numConnections >= this._maxSize && requireNew) { const resource = pool.pop() - if (this._removeIdleObserver) { - this._removeIdleObserver(resource) + if (resource != null) { + if (this._removeIdleObserver != null) { + this._removeIdleObserver(resource) + } + pool.removeInUse(resource) + await this._destroy(resource) } - pool.removeInUse(resource) - await this._destroy(resource) } // Invoke callback that creates actual connection - resource = await this._create(acquisitionContext, address, (address, resource) => this._release(address, resource, pool)) + resource = await this._create(acquisitionContext, address, async (address, resource) => await this._release(address, resource, pool)) pool.pushInUse(resource) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} created for the pool ${key}`) } } finally { @@ -269,7 +308,7 @@ class Pool { return { resource, pool } } - async _release (address, resource, pool) { + async _release (address: ServerAddress, resource: R, pool: SingleAddressPool): Promise { const key = address.asKey() try { @@ -278,20 +317,22 @@ class Pool { if (!await this._validateOnRelease(resource)) { if (this._log.isDebugEnabled()) { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `${resource} destroyed and can't be released to the pool ${key} because it is not functional` ) } pool.removeInUse(resource) await this._destroy(resource) } else { - if (this._installIdleObserver) { + if (this._installIdleObserver != null) { this._installIdleObserver(resource, { - onError: error => { + onError: (error: Error) => { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `Idle connection ${resource} destroyed because of error: ${error}` ) const pool = this._pools[key] - if (pool) { + if (pool != null) { this._pools[key] = pool.filter(r => r !== resource) pool.removeInUse(resource) } @@ -304,6 +345,7 @@ class Pool { } pool.push(resource) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} released to the pool ${key}`) } } @@ -311,6 +353,7 @@ class Pool { // key has been purged, don't put it back, just destroy the resource if (this._log.isDebugEnabled()) { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` ) } @@ -324,45 +367,58 @@ class Pool { } } - async _purgeKey (key) { + async _purgeKey (key: string): Promise { const pool = this._pools[key] const destructionList = [] - if (pool) { - while (pool.length) { + if (pool != null) { + while (pool.length > 0) { const resource = pool.pop() - if (this._removeIdleObserver) { + if (resource == null) { + continue + } + + if (this._removeIdleObserver != null) { this._removeIdleObserver(resource) } destructionList.push(this._destroy(resource)) } pool.close() + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete this._pools[key] await Promise.all(destructionList) } } - _processPendingAcquireRequests (address) { + _processPendingAcquireRequests (address: ServerAddress): void { const key = address.asKey() const requests = this._acquireRequests[key] - if (requests) { + if (requests != null) { const pendingRequest = requests.shift() // pop a pending acquire request - if (pendingRequest) { + if (pendingRequest != null) { this._acquire(pendingRequest.context, address, pendingRequest.requireNew) .catch(error => { // failed to acquire/create a new connection to resolve the pending acquire request // propagate the error by failing the pending request pendingRequest.reject(error) - return { resource: null } + return { resource: null, pool: null } }) .then(({ resource, pool }) => { - if (resource) { + // there is not situation where the pool resource is not null and the + // pool is null. + if (resource != null && pool != null) { // managed to acquire a valid resource from the pool if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool this._release(address, resource, pool) + .catch(error => { + if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this._log.debug(`${resource} could not be release back to the pool. Cause: ${error}`) + } + }) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -371,14 +427,15 @@ class Pool { // failed to acquire a valid resource from the pool // return the pending request back to the pool if (!pendingRequest.isCompleted()) { - if (!this._acquireRequests[key]) { + if (this._acquireRequests[key] == null) { this._acquireRequests[key] = [] } this._acquireRequests[key].unshift(pendingRequest) } } - }) + }).catch(error => pendingRequest.reject(error)) } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete this._acquireRequests[key] } } @@ -390,8 +447,8 @@ class Pool { * @param {string} key the resource group identifier (server address for connections). * @param {Object.} activeResourceCounts the object holding active counts per key. */ -function resourceAcquired (key, activeResourceCounts) { - const currentCount = activeResourceCounts[key] || 0 +function resourceAcquired (key: string, activeResourceCounts: { [key: string]: number }): void { + const currentCount = activeResourceCounts[key] ?? 0 activeResourceCounts[key] = currentCount + 1 } @@ -400,19 +457,29 @@ function resourceAcquired (key, activeResourceCounts) { * @param {string} key the resource group identifier (server address for connections). * @param {Object.} activeResourceCounts the object holding active counts per key. */ -function resourceReleased (key, activeResourceCounts) { - const currentCount = activeResourceCounts[key] || 0 +function resourceReleased (key: string, activeResourceCounts: { [key: string]: number }): void { + const currentCount = activeResourceCounts[key] ?? 0 const nextCount = currentCount - 1 if (nextCount > 0) { activeResourceCounts[key] = nextCount } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete activeResourceCounts[key] } } -class PendingRequest { - constructor (key, context, config, resolve, reject, timeoutId, log) { +class PendingRequest { + private readonly _key: string + private readonly _context: unknown + private readonly _config: AcquisitionConfig + private readonly _resolve: (resource: R) => void + private readonly _reject: (error: Error) => void + private readonly _timeoutId: any + private readonly _log: Logger + private _completed: boolean + + constructor (key: string, context: unknown, config: AcquisitionConfig | undefined, resolve: (resource: R) => void, reject: (error: Error) => void, timeoutId: any, log: Logger) { this._key = key this._context = context this._resolve = resolve @@ -420,22 +487,22 @@ class PendingRequest { this._timeoutId = timeoutId this._log = log this._completed = false - this._config = config || {} + this._config = config ?? {} } - get context () { + get context (): unknown { return this._context } - get requireNew () { - return this._config.requireNew || false + get requireNew (): boolean { + return this._config.requireNew ?? false } - isCompleted () { + isCompleted (): boolean { return this._completed } - resolve (resource) { + resolve (resource: R): void { if (this._completed) { return } @@ -443,12 +510,13 @@ class PendingRequest { clearTimeout(this._timeoutId) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} acquired from the pool ${this._key}`) } this._resolve(resource) } - reject (error) { + reject (error: Error): void { if (this._completed) { return } @@ -459,53 +527,59 @@ class PendingRequest { } } -class SingleAddressPool { +class SingleAddressPool { + private _active: boolean + private _elements: R[] + private _elementsInUse: Set + constructor () { this._active = true this._elements = [] this._elementsInUse = new Set() } - isActive () { + isActive (): boolean { return this._active } - close () { + close (): void { this._active = false this._elements = [] this._elementsInUse = new Set() } - filter (predicate) { + filter (predicate: (resource: R) => boolean): SingleAddressPool { this._elements = this._elements.filter(predicate) return this } - apply (resourceConsumer) { + apply (resourceConsumer: (resource: R) => void): void { this._elements.forEach(resourceConsumer) this._elementsInUse.forEach(resourceConsumer) } - get length () { + get length (): number { return this._elements.length } - pop () { + pop (): R | undefined { const element = this._elements.pop() - this._elementsInUse.add(element) + if (element != null) { + this._elementsInUse.add(element) + } return element } - push (element) { + push (element: R): number { this._elementsInUse.delete(element) return this._elements.push(element) } - pushInUse (element) { + pushInUse (element: R): void { this._elementsInUse.add(element) } - removeInUse (element) { + removeInUse (element: R): void { this._elementsInUse.delete(element) } } From 3cb0cfe65b7d994174a89e0221c4434c16399a05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 15:38:31 +0200 Subject: [PATCH 05/10] Lint --- packages/core/src/internal/pool/pool-config.ts | 10 +++++----- .../lib/core/internal/pool/pool-config.ts | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/core/src/internal/pool/pool-config.ts b/packages/core/src/internal/pool/pool-config.ts index 6582f4882..b1f5e5d94 100644 --- a/packages/core/src/internal/pool/pool-config.ts +++ b/packages/core/src/internal/pool/pool-config.ts @@ -30,15 +30,15 @@ export default class PoolConfig { ) } - static defaultConfig () { + static defaultConfig (): PoolConfig { return new PoolConfig(DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT) } - static fromDriverConfig (config: { maxConnectionPoolSize?: number, connectionAcquisitionTimeout?: number} ) { + static fromDriverConfig (config: { maxConnectionPoolSize?: number, connectionAcquisitionTimeout?: number }): PoolConfig { const maxSize = isConfigured(config.maxConnectionPoolSize) ? config.maxConnectionPoolSize : DEFAULT_MAX_SIZE - + const acquisitionTimeout = isConfigured( config.connectionAcquisitionTimeout ) @@ -49,8 +49,8 @@ export default class PoolConfig { } } -function valueOrDefault (value: number | undefined, defaultValue: number) { - return value === 0 || value ? value : defaultValue +function valueOrDefault (value: number | undefined, defaultValue: number): number { + return isConfigured(value) ? value : defaultValue } function isConfigured (value?: number): value is number { diff --git a/packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts b/packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts index 6582f4882..b1f5e5d94 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts @@ -30,15 +30,15 @@ export default class PoolConfig { ) } - static defaultConfig () { + static defaultConfig (): PoolConfig { return new PoolConfig(DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT) } - static fromDriverConfig (config: { maxConnectionPoolSize?: number, connectionAcquisitionTimeout?: number} ) { + static fromDriverConfig (config: { maxConnectionPoolSize?: number, connectionAcquisitionTimeout?: number }): PoolConfig { const maxSize = isConfigured(config.maxConnectionPoolSize) ? config.maxConnectionPoolSize : DEFAULT_MAX_SIZE - + const acquisitionTimeout = isConfigured( config.connectionAcquisitionTimeout ) @@ -49,8 +49,8 @@ export default class PoolConfig { } } -function valueOrDefault (value: number | undefined, defaultValue: number) { - return value === 0 || value ? value : defaultValue +function valueOrDefault (value: number | undefined, defaultValue: number): number { + return isConfigured(value) ? value : defaultValue } function isConfigured (value?: number): value is number { From 88e860dc8eeede9f8371c08a1a0b7375df516f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 16:12:45 +0200 Subject: [PATCH 06/10] Fix testinFix testingg --- packages/neo4j-driver/test/driver.test.js | 7 ++----- packages/neo4j-driver/test/internal/pool-config.test.js | 9 +++++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/neo4j-driver/test/driver.test.js b/packages/neo4j-driver/test/driver.test.js index 1be602cba..224086a56 100644 --- a/packages/neo4j-driver/test/driver.test.js +++ b/packages/neo4j-driver/test/driver.test.js @@ -18,15 +18,12 @@ import neo4j from '../src' import sharedNeo4j from './internal/shared-neo4j' import lolex from 'lolex' -import { - DEFAULT_ACQUISITION_TIMEOUT, - DEFAULT_MAX_SIZE -} from '../../bolt-connection/lib/pool/pool-config' import testUtils from './internal/test-utils' import { json, internal, bookmarkManager } from 'neo4j-driver-core' const { - bookmarks: { Bookmarks } + bookmarks: { Bookmarks }, + pool: { DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE } } = internal // As long as driver creation doesn't touch the network it's fine to run diff --git a/packages/neo4j-driver/test/internal/pool-config.test.js b/packages/neo4j-driver/test/internal/pool-config.test.js index 43218098e..7361b250e 100644 --- a/packages/neo4j-driver/test/internal/pool-config.test.js +++ b/packages/neo4j-driver/test/internal/pool-config.test.js @@ -15,10 +15,11 @@ * limitations under the License. */ -import PoolConfig, { - DEFAULT_ACQUISITION_TIMEOUT, - DEFAULT_MAX_SIZE -} from '../../../bolt-connection/lib/pool/pool-config' +import { internal } from 'neo4j-driver-core' + +const { + pool: { DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE, PoolConfig } +} = internal describe('#unit PoolConfig', () => { let originalConsoleWarn From 1bd02ff99e30ed44d1b21831be89781e59905fc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 16:23:09 +0200 Subject: [PATCH 07/10] Adjust import --- .../least-connected-load-balancing-strategy.test.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/neo4j-driver/test/internal/least-connected-load-balancing-strategy.test.js b/packages/neo4j-driver/test/internal/least-connected-load-balancing-strategy.test.js index 75362f288..882dd0faf 100644 --- a/packages/neo4j-driver/test/internal/least-connected-load-balancing-strategy.test.js +++ b/packages/neo4j-driver/test/internal/least-connected-load-balancing-strategy.test.js @@ -14,10 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import Pool from '../../../bolt-connection/lib/pool/pool' +import { internal } from 'neo4j-driver-core' import { loadBalancing } from 'neo4j-driver-bolt-connection' const { LeastConnectedLoadBalancingStrategy } = loadBalancing +const { + pool: { Pool } +} = internal + describe('#unit LeastConnectedLoadBalancingStrategy', () => { it('should return null when no readers', () => { const knownReaders = [] From 523c44857ddd93a761ef629dddfa5685800aeddd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 16:32:16 +0200 Subject: [PATCH 08/10] Only object on node --- packages/core/src/internal/pool/pool.ts | 7 ++++++- packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/core/src/internal/pool/pool.ts b/packages/core/src/internal/pool/pool.ts index 5fb447d2b..f6c0872eb 100644 --- a/packages/core/src/internal/pool/pool.ts +++ b/packages/core/src/internal/pool/pool.ts @@ -144,7 +144,12 @@ class Pool { ) } }, this._acquisitionTimeout) - typeof timeoutId === 'object' && timeoutId.unref() + + if (typeof timeoutId === 'object') { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-expect-error + timeoutId.unref() + } const request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) allRequests[key].push(request) diff --git a/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts index d97a32668..9d20b9442 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts @@ -144,7 +144,11 @@ class Pool { ) } }, this._acquisitionTimeout) - typeof timeoutId === 'object' && timeoutId.unref() + + if (typeof timeoutId === 'object') { + // @ts-ignore + timeoutId.unref() + } const request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) allRequests[key].push(request) From cd87d105fa706fd6706e6e4cdb3dc2523bd0c65f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 16:36:08 +0200 Subject: [PATCH 09/10] @ts-ignore --- packages/core/src/internal/pool/pool.ts | 4 ++-- packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/core/src/internal/pool/pool.ts b/packages/core/src/internal/pool/pool.ts index f6c0872eb..9071dedee 100644 --- a/packages/core/src/internal/pool/pool.ts +++ b/packages/core/src/internal/pool/pool.ts @@ -146,8 +146,8 @@ class Pool { }, this._acquisitionTimeout) if (typeof timeoutId === 'object') { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error + // eslint-disable-next-line + // @ts-ignore timeoutId.unref() } diff --git a/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts index 9d20b9442..dca6951b4 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts @@ -146,6 +146,7 @@ class Pool { }, this._acquisitionTimeout) if (typeof timeoutId === 'object') { + // eslint-disable-next-line // @ts-ignore timeoutId.unref() } From 32b90ee80ffcfe6402f4bdd2a10345803911b137 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 27 Aug 2024 16:37:25 +0200 Subject: [PATCH 10/10] Sync Deno --- packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts index dca6951b4..e657c26e4 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts @@ -146,7 +146,7 @@ class Pool { }, this._acquisitionTimeout) if (typeof timeoutId === 'object') { - // eslint-disable-next-line + // eslint-disable-next-line // @ts-ignore timeoutId.unref() }