diff --git a/src/v1/internal/pool.js b/src/v1/internal/pool.js index aa2b9f247..2288f5c8c 100644 --- a/src/v1/internal/pool.js +++ b/src/v1/internal/pool.js @@ -154,25 +154,25 @@ class Pool { // key has been purged, don't put it back, just destroy the resource this._destroy(resource); } + resourceReleased(key, this._activeResourceCounts); // check if there are any pending requests const requests = this._acquireRequests[key]; if (requests) { - const pending = requests.shift(); + const pending = requests[0]; if (pending) { const resource = this._acquire(key); if (resource) { - pending.resolve(resource); - - return; + // managed to acquire a valid resource from the pool to satisfy the pending acquire request + resourceAcquired(key, this._activeResourceCounts); // increment the active counter + requests.shift(); // forget the pending request + pending.resolve(resource); // resolve the pending request with the acquired resource } } else { delete this._acquireRequests[key]; } } - - resourceReleased(key, this._activeResourceCounts); } } diff --git a/test/internal/pool.test.js b/test/internal/pool.test.js index 0f46fecd2..12d2e9804 100644 --- a/test/internal/pool.test.js +++ b/test/internal/pool.test.js @@ -522,16 +522,106 @@ describe('Pool', () => { }); }); + it('should resolve pending acquisition request when single invalid resource returned', done => { + const key = 'bolt://localhost:7687'; + const acquisitionTimeout = 1000; + let counter = 0; + + const pool = new Pool( + (url, release) => new Resource(url, counter++, release), + resource => { + }, + resourceValidOnlyOnceValidationFunction, + new PoolConfig(1, acquisitionTimeout) + ); + + pool.acquire(key).then(resource1 => { + expect(resource1.id).toEqual(0); + expect(pool.activeResourceCount(key)).toEqual(1); + + // release the resource before the acquisition timeout, it should be treated as invalid + setTimeout(() => { + expectNumberOfAcquisitionRequests(pool, key, 1); + resource1.close(); + }, acquisitionTimeout / 2); + + pool.acquire(key).then(resource2 => { + expect(resource2.id).toEqual(1); + expectNoPendingAcquisitionRequests(pool); + expect(pool.activeResourceCount(key)).toEqual(1); + done(); + }).catch(error => { + done.fail(error); + }); + }); + }); + + it('should work fine when invalid resources released and acquisition attempt pending', done => { + const key = 'bolt://localhost:7687'; + const acquisitionTimeout = 1000; + let counter = 0; + + const pool = new Pool( + (url, release) => new Resource(url, counter++, release), + resource => { + }, + resourceValidOnlyOnceValidationFunction, + new PoolConfig(2, acquisitionTimeout) + ); + + pool.acquire(key).then(resource1 => { + expect(resource1.id).toEqual(0); + expect(pool.activeResourceCount(key)).toEqual(1); + + pool.acquire(key).then(resource2 => { + expect(resource2.id).toEqual(1); + expect(pool.activeResourceCount(key)).toEqual(2); + + // release both resources before the acquisition timeout, they should be treated as invalid + setTimeout(() => { + expectNumberOfAcquisitionRequests(pool, key, 1); + resource1.close(); + resource2.close(); + }, acquisitionTimeout / 2); + + pool.acquire(key).then(resource3 => { + expect(resource3.id).toEqual(2); + expectNoPendingAcquisitionRequests(pool); + expect(pool.activeResourceCount(key)).toEqual(1); + done(); + }).catch(error => { + done.fail(error); + }); + }); + }); + }); + }); function expectNoPendingAcquisitionRequests(pool) { - expect(pool._acquireRequests).toEqual({}); + const acquireRequests = pool._acquireRequests; + Object.values(acquireRequests).forEach(requests => { + if (Array.isArray(requests) && requests.length === 0) { + requests = undefined; + } + expect(requests).not.toBeDefined(); + }); } function expectNumberOfAcquisitionRequests(pool, key, expectedNumber) { expect(pool._acquireRequests[key].length).toEqual(expectedNumber); } +function resourceValidOnlyOnceValidationFunction(resource) { + // all resources are valid only once + if (resource.validatedOnce) { + return false; + } else { + resource.validatedOnce = true; + return true; + } +} + class Resource { constructor(key, id, release) {