diff --git a/packages/bolt-connection/src/bolt/response-handler.js b/packages/bolt-connection/src/bolt/response-handler.js index df775f140..38b611874 100644 --- a/packages/bolt-connection/src/bolt/response-handler.js +++ b/packages/bolt-connection/src/bolt/response-handler.js @@ -78,6 +78,7 @@ export default class ResponseHandler { this._transformMetadata = transformMetadata || NO_OP_IDENTITY this._observer = Object.assign( { + onPendingObserversChange: NO_OP, onError: NO_OP, onFailure: NO_OP, onErrorApplyTransformation: NO_OP_IDENTITY @@ -156,6 +157,7 @@ export default class ResponseHandler { */ _updateCurrentObserver () { this._currentObserver = this._pendingObservers.shift() + this._observer.onPendingObserversChange(this._pendingObservers.length) } _queueObserver (observer) { @@ -168,6 +170,7 @@ export default class ResponseHandler { } else { this._pendingObservers.push(observer) } + this._observer.onPendingObserversChange(this._pendingObservers.length) return true } diff --git a/packages/bolt-connection/src/channel/browser/browser-channel.js b/packages/bolt-connection/src/channel/browser/browser-channel.js index 469f5300d..9476c0325 100644 --- a/packages/bolt-connection/src/channel/browser/browser-channel.js +++ b/packages/bolt-connection/src/channel/browser/browser-channel.js @@ -181,6 +181,18 @@ export default class WebSocketChannel { */ setupReceiveTimeout (receiveTimeout) {} + /** + * Stops the receive timeout for the channel. + */ + stopReceiveTimeout() { + } + + /** + * Start the receive timeout for the channel. + */ + startReceiveTimeout () { + } + /** * Set connection timeout on the given WebSocket, if configured. * @return {number} the timeout id or null. diff --git a/packages/bolt-connection/src/channel/node/node-channel.js b/packages/bolt-connection/src/channel/node/node-channel.js index 4ffba285c..75d5d2ed6 100644 --- a/packages/bolt-connection/src/channel/node/node-channel.js +++ b/packages/bolt-connection/src/channel/node/node-channel.js @@ -242,6 +242,8 @@ export default class NodeChannel { this ) this._connectionErrorCode = config.connectionErrorCode + this._receiveTimeout = null + this._receiveTimeoutStarted = false this._conn = connect( config, @@ -353,7 +355,27 @@ export default class NodeChannel { ) }) - this._conn.setTimeout(receiveTimeout) + this._receiveTimeout = receiveTimeout + } + + /** + * Stops the receive timeout for the channel. + */ + stopReceiveTimeout() { + if (this._receiveTimeout !== null && this._receiveTimeoutStarted) { + this._receiveTimeoutStarted = false + this._conn.setTimeout(0) + } + } + + /** + * Start the receive timeout for the channel. + */ + startReceiveTimeout () { + if (this._receiveTimeout !== null && !this._receiveTimeoutStarted) { + this._receiveTimeoutStarted = true + this._conn.setTimeout(this._receiveTimeout) + } } /** diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index b72cecb8d..a00b01288 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -69,6 +69,7 @@ export function createChannelConnection ( server: conn.server, log: conn.logger, observer: { + onPendingObserversChange: conn._handleOngoingRequestsNumberChange.bind(conn), onError: conn._handleFatalError.bind(conn), onFailure: conn._resetOnFailure.bind(conn), onProtocolError: conn._handleProtocolError.bind(conn), @@ -350,6 +351,18 @@ export default class ChannelConnection extends Connection { return !this._isBroken && this._ch._open } + /** + * Starts and stops the receive timeout timer. + * @param {number} requestsNumber Ongoing requests number + */ + _handleOngoingRequestsNumberChange(requestsNumber) { + if (requestsNumber === 0) { + this._ch.stopReceiveTimeout() + } else { + this._ch.startReceiveTimeout() + } + } + /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the underlying channel is closed. diff --git a/packages/bolt-connection/test/channel/node/node-channel.test.js b/packages/bolt-connection/test/channel/node/node-channel.test.js index 5d6b175b1..1bd4f3abb 100644 --- a/packages/bolt-connection/test/channel/node/node-channel.test.js +++ b/packages/bolt-connection/test/channel/node/node-channel.test.js @@ -59,13 +59,13 @@ describe('NodeChannel', () => { }) describe('.setupReceiveTimeout()', () => { - it('should call socket.setTimeout(receiveTimeout)', () => { + it('should not call socket.setTimeout(receiveTimeout)', () => { const receiveTimeout = 42 const channel = createMockedChannel(true) channel.setupReceiveTimeout(receiveTimeout) - expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout]) + expect(channel._conn.getCalls().setTimeout.length).toEqual(1) }) it('should unsubscribe to the on connect and on timeout created on the create socket', () => { @@ -108,6 +108,122 @@ describe('NodeChannel', () => { expect(channel._conn.getCalls().off).toEqual([]) }) }) + + describe('.startReceiveTimeout()', () => { + describe('receive timeout is setup', () => { + it('should call socket.setTimeout(receiveTimeout) when called first', () => { + const { receiveTimeout, channel } = setup() + + channel.startReceiveTimeout() + + expect(channel._conn.getCalls().setTimeout.length).toEqual(2) + expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout]) + }) + + it ('should not call socket.setTimeout(receiveTimeout) if stream already started', () => { + const { receiveTimeout, channel } = setup() + + // setup + channel.startReceiveTimeout() + expect(channel._conn.getCalls().setTimeout.length).toEqual(2) + expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout]) + + // start again + channel.startReceiveTimeout() + + expect(channel._conn.getCalls().setTimeout.length).toEqual(2) + expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout]) + }) + + it ('should call socket.setTimeout(receiveTimeout) when after stop', () => { + const { receiveTimeout, channel } = setup() + + // setup + channel.startReceiveTimeout() + expect(channel._conn.getCalls().setTimeout.length).toEqual(2) + expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout]) + channel.stopReceiveTimeout() + expect(channel._conn.getCalls().setTimeout.length).toEqual(3) + expect(channel._conn.getCalls().setTimeout[2]).toEqual([0]) + + // start again + channel.startReceiveTimeout() + + expect(channel._conn.getCalls().setTimeout.length).toEqual(4) + expect(channel._conn.getCalls().setTimeout[3]).toEqual([receiveTimeout]) + }) + + function setup () { + const channel = createMockedChannel(true) + const receiveTimeout = 42 + channel.setupReceiveTimeout(receiveTimeout) + return {channel, receiveTimeout} + } + }) + + describe('receive timemout is not setup', () => { + it ('should call not socket.setTimeout(receiveTimeout) when not started', () => { + const channel = createMockedChannel(true) + + // start again + channel.startReceiveTimeout() + + expect(channel._conn.getCalls().setTimeout.length).toEqual(1) + }) + }) + }) + + describe('.stopReceiveTimeout()', () => { + describe('when receive timeout is setup', () => { + it ('should not call socket.setTimeout(0) when not started', () => { + const { channel } = setup() + + channel.stopReceiveTimeout() + + expect(channel._conn.getCalls().setTimeout.length).toEqual(1) + }) + + it ('should call socket.setTimeout(0) when already started', () => { + const { channel } = setup() + + channel.startReceiveTimeout() + + channel.stopReceiveTimeout() + + expect(channel._conn.getCalls().setTimeout.length).toEqual(3) + expect(channel._conn.getCalls().setTimeout[2]).toEqual([0]) + }) + + it ('should not call socket.setTimeout(0) when already stopped', () => { + const { channel } = setup() + + channel.startReceiveTimeout() + channel.stopReceiveTimeout() + + channel.stopReceiveTimeout() + + expect(channel._conn.getCalls().setTimeout.length).toEqual(3) + }) + + function setup () { + const channel = createMockedChannel(true) + const receiveTimeout = 42 + channel.setupReceiveTimeout(receiveTimeout) + return {channel, receiveTimeout} + } + }) + + describe('when receive timeout is not setup', () => { + it ('should not call socket.setTimeout(0)', () => { + const channel = createMockedChannel(true) + + channel.startReceiveTimeout() + channel.stopReceiveTimeout() + + expect(channel._conn.getCalls().setTimeout.length).toEqual(1) + }) + }) + }) }) function createMockedChannel (connected, config = {}) { diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index f5ffbe46d..7cb0dc447 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -67,7 +67,7 @@ describe('ChannelConnection', () => { [{ hints: { 'connection.recv_timeout_seconds': 0n } }], [{ hints: { 'connection.recv_timeout_seconds': int(0) } }] ])( - 'should call not call this._ch.setupReceiveTimeout() when onComplete metadata is %o', + 'should not call this._ch.setupReceiveTimeout() when onComplete metadata is %o', async metadata => { const channel = { setupReceiveTimeout: jest.fn().mockName('setupReceiveTimeout') @@ -286,6 +286,60 @@ describe('ChannelConnection', () => { }) }) + describe('.__handleOngoingRequestsNumberChange()', () => { + it('should call channel.stopReceiveTimeout when requets number equals to 0', () => { + const channel = { + stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'), + startReceiveTimeout: jest.fn().mockName('startReceiveTimeout') + } + const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined }) + + connection._handleOngoingRequestsNumberChange(0) + + expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1) + }) + + it('should not call channel.startReceiveTimeout when requets number equals to 0', () => { + const channel = { + stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'), + startReceiveTimeout: jest.fn().mockName('startReceiveTimeout') + } + const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined }) + + connection._handleOngoingRequestsNumberChange(0) + + expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0) + }) + + it.each([ + [1], [2], [3], [5], [8], [13], [3000] + ])('should call channel.startReceiveTimeout when requets number equals to %d', (requests) => { + const channel = { + stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'), + startReceiveTimeout: jest.fn().mockName('startReceiveTimeout') + } + const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined }) + + connection._handleOngoingRequestsNumberChange(requests) + + expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(1) + }) + + it.each([ + [1], [2], [3], [5], [8], [13], [3000] + ])('should not call channel.stopReceiveTimeout when requets number equals to %d', (requests) => { + const channel = { + stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'), + startReceiveTimeout: jest.fn().mockName('startReceiveTimeout') + } + const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined }) + + connection._handleOngoingRequestsNumberChange(requests) + + expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0) + }) + }) + function spyOnConnectionChannel ({ channel, errorHandler, diff --git a/packages/neo4j-driver/test/internal/dummy-channel.js b/packages/neo4j-driver/test/internal/dummy-channel.js index e57ca1ca1..3fddfca1e 100644 --- a/packages/neo4j-driver/test/internal/dummy-channel.js +++ b/packages/neo4j-driver/test/internal/dummy-channel.js @@ -36,6 +36,10 @@ export default class DummyChannel { this.written.push(buf) } + stopReceiveTimeout () {} + + startReceiveTimeout () {} + toHex () { let out = '' for (let i = 0; i < this.written.length; i++) {