From 0dd57b4a72c28cc7e9b398b72d34dff4bbe2bd49 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 3 May 2018 15:10:47 +0200 Subject: [PATCH 1/2] Fix release of broken connections to the pool An attempt to resolve a pending acquisition request can be made when connection is returned back to the pool. It will only be made when there exists a pending acquisition request and returned connection is valid. It was previously possible for the release operation to deque a pending acquisition request and forget about it, even when the returned connection was invalid. This made the acquisition request either fail with acquisition timeout or with "Cannot read property 'filter' of undefined" error. Later happened because subsequent release operation for the same key removed the array of pending acquisition requests. This commit fixes the problem by making release operation only deque an acquisition request when new request can be acquired. --- src/v1/internal/pool.js | 7 +++-- test/internal/pool.test.js | 58 +++++++++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/src/v1/internal/pool.js b/src/v1/internal/pool.js index aa2b9f247..1f2da8566 100644 --- a/src/v1/internal/pool.js +++ b/src/v1/internal/pool.js @@ -158,13 +158,16 @@ class Pool { // check if there are any pending requests const requests = this._acquireRequests[key]; if (requests) { - const pending = requests.shift(); + const pending = requests[0]; if (pending) { const resource = this._acquire(key); if (resource) { - pending.resolve(resource); + // managed to acquire a valid resource from the pool to satisfy the pending acquire request + requests.shift(); // forget the pending request + pending.resolve(resource); // resolve the pending request with the acquired resource + // return early to not decrement the number of released resources return; } } else { diff --git a/test/internal/pool.test.js b/test/internal/pool.test.js index 0f46fecd2..91218f164 100644 --- a/test/internal/pool.test.js +++ b/test/internal/pool.test.js @@ -522,16 +522,72 @@ describe('Pool', () => { }); }); + it('should work fine when invalid resources released and acquisition attempt pending', done => { + const key = 'bolt://localhost:7687'; + const acquisitionTimeout = 1000; + let counter = 0; + + const pool = new Pool( + (url, release) => new Resource(url, counter++, release), + resource => { + }, + resourceValidOnlyOnceValidationFunction, + new PoolConfig(2, acquisitionTimeout) + ); + + pool.acquire(key).then(resource1 => { + expect(resource1.id).toEqual(0); + expect(pool.activeResourceCount(key)).toEqual(1); + + pool.acquire(key).then(resource2 => { + expect(resource2.id).toEqual(1); + expect(pool.activeResourceCount(key)).toEqual(2); + + // release both resources before the acquisition timeout, they should be treated as invalid + setTimeout(() => { + expectNumberOfAcquisitionRequests(pool, key, 1); + resource1.close(); + resource2.close(); + }, acquisitionTimeout / 2); + + pool.acquire(key).then(resource3 => { + expect(resource3.id).toEqual(2); + expectNoPendingAcquisitionRequests(pool); + expect(pool.activeResourceCount(key)).toEqual(1); + done(); + }).catch(error => { + done.fail(error); + }); + }); + }); + }); + }); function expectNoPendingAcquisitionRequests(pool) { - expect(pool._acquireRequests).toEqual({}); + const acquireRequests = pool._acquireRequests; + Object.values(acquireRequests).forEach(requests => { + if (Array.isArray(requests) && requests.length === 0) { + requests = undefined; + } + expect(requests).not.toBeDefined(); + }); } function expectNumberOfAcquisitionRequests(pool, key, expectedNumber) { expect(pool._acquireRequests[key].length).toEqual(expectedNumber); } +function resourceValidOnlyOnceValidationFunction(resource) { + // all resources are valid only once + if (resource.validatedOnce) { + return false; + } else { + resource.validatedOnce = true; + return true; + } +} + class Resource { constructor(key, id, release) { From e87c70c1f80f9520199ecd7a503564d18fb162b5 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 3 May 2018 15:56:32 +0200 Subject: [PATCH 2/2] Improve release of broken connection when pool is full Release operation discards the connection if it is broken. It also tries to resolve a pending acquisition request, if any. In order to this it tries to execute a promise-free internal acquire that immediately returns an available resource or creates a new one, if possible. New resource can only be created if pool is not at its maximum capacity. Previously, release operation only decremented the active connection counter at the end. This made it impossible to create a new connection when broken one is returned. This commit fixes the problem by making `Pool#_release()` decrement the active counter after connection has been either added to the list of idle connections or discarded because it turned out to be broken. --- src/v1/internal/pool.js | 7 ++----- test/internal/pool.test.js | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/v1/internal/pool.js b/src/v1/internal/pool.js index 1f2da8566..2288f5c8c 100644 --- a/src/v1/internal/pool.js +++ b/src/v1/internal/pool.js @@ -154,6 +154,7 @@ class Pool { // key has been purged, don't put it back, just destroy the resource this._destroy(resource); } + resourceReleased(key, this._activeResourceCounts); // check if there are any pending requests const requests = this._acquireRequests[key]; @@ -164,18 +165,14 @@ class Pool { const resource = this._acquire(key); if (resource) { // managed to acquire a valid resource from the pool to satisfy the pending acquire request + resourceAcquired(key, this._activeResourceCounts); // increment the active counter requests.shift(); // forget the pending request pending.resolve(resource); // resolve the pending request with the acquired resource - - // return early to not decrement the number of released resources - return; } } else { delete this._acquireRequests[key]; } } - - resourceReleased(key, this._activeResourceCounts); } } diff --git a/test/internal/pool.test.js b/test/internal/pool.test.js index 91218f164..12d2e9804 100644 --- a/test/internal/pool.test.js +++ b/test/internal/pool.test.js @@ -522,6 +522,40 @@ describe('Pool', () => { }); }); + it('should resolve pending acquisition request when single invalid resource returned', done => { + const key = 'bolt://localhost:7687'; + const acquisitionTimeout = 1000; + let counter = 0; + + const pool = new Pool( + (url, release) => new Resource(url, counter++, release), + resource => { + }, + resourceValidOnlyOnceValidationFunction, + new PoolConfig(1, acquisitionTimeout) + ); + + pool.acquire(key).then(resource1 => { + expect(resource1.id).toEqual(0); + expect(pool.activeResourceCount(key)).toEqual(1); + + // release the resource before the acquisition timeout, it should be treated as invalid + setTimeout(() => { + expectNumberOfAcquisitionRequests(pool, key, 1); + resource1.close(); + }, acquisitionTimeout / 2); + + pool.acquire(key).then(resource2 => { + expect(resource2.id).toEqual(1); + expectNoPendingAcquisitionRequests(pool); + expect(pool.activeResourceCount(key)).toEqual(1); + done(); + }).catch(error => { + done.fail(error); + }); + }); + }); + it('should work fine when invalid resources released and acquisition attempt pending', done => { const key = 'bolt://localhost:7687'; const acquisitionTimeout = 1000;