From 0e3b1f796f1587b08ef3e6e4d1f99572a3153016 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Fri, 15 Dec 2023 13:03:39 +0100 Subject: [PATCH] Fix connection receive timeout for single requests Resets and non-pipeline requests were not triggering the connection receive timeout. The problem happened because of the timeout mechanics only started when pending observers are set. However, the current observer should be also be considered for this use case. --- .../src/bolt/response-handler.js | 10 ++++-- .../src/connection/connection-channel.js | 2 +- .../test/bolt/response-handler.test.js | 33 +++++++++++++++++++ .../bolt-connection/bolt/response-handler.js | 10 ++++-- .../connection/connection-channel.js | 2 +- 5 files changed, 49 insertions(+), 8 deletions(-) diff --git a/packages/bolt-connection/src/bolt/response-handler.js b/packages/bolt-connection/src/bolt/response-handler.js index 4e540cfb4..6944c8ed4 100644 --- a/packages/bolt-connection/src/bolt/response-handler.js +++ b/packages/bolt-connection/src/bolt/response-handler.js @@ -76,7 +76,7 @@ export default class ResponseHandler { this._transformMetadata = transformMetadata || NO_OP_IDENTITY this._observer = Object.assign( { - onPendingObserversChange: NO_OP, + onObserversCountChange: NO_OP, onError: NO_OP, onFailure: NO_OP, onErrorApplyTransformation: NO_OP_IDENTITY @@ -156,7 +156,11 @@ export default class ResponseHandler { */ _updateCurrentObserver () { this._currentObserver = this._pendingObservers.shift() - this._observer.onPendingObserversChange(this._pendingObservers.length) + this._observer.onObserversCountChange(this._observersCount) + } + + get _observersCount () { + return this._currentObserver == null ? this._pendingObservers.length : this._pendingObservers.length + 1 } _queueObserver (observer) { @@ -169,7 +173,7 @@ export default class ResponseHandler { } else { this._pendingObservers.push(observer) } - this._observer.onPendingObserversChange(this._pendingObservers.length) + this._observer.onObserversCountChange(this._observersCount) return true } diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index f291b7ec3..43f4fe121 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -67,7 +67,7 @@ export function createChannelConnection ( server: conn.server, log: conn.logger, observer: { - onPendingObserversChange: conn._handleOngoingRequestsNumberChange.bind(conn), + onObserversCountChange: conn._handleOngoingRequestsNumberChange.bind(conn), onError: conn._handleFatalError.bind(conn), onFailure: conn._resetOnFailure.bind(conn), onProtocolError: conn._handleProtocolError.bind(conn), diff --git a/packages/bolt-connection/test/bolt/response-handler.test.js b/packages/bolt-connection/test/bolt/response-handler.test.js index e6d55cd42..3c75c1b98 100644 --- a/packages/bolt-connection/test/bolt/response-handler.test.js +++ b/packages/bolt-connection/test/bolt/response-handler.test.js @@ -22,6 +22,7 @@ const { logger: { Logger } } = internal +const SUCCESS = 0x70 // 0111 0000 // SUCCESS const FAILURE = 0x7f // 0111 1111 // FAILURE describe('response-handler', () => { @@ -69,4 +70,36 @@ describe('response-handler', () => { expect(receivedError.code).toBe(expectedError.code) }) }) + + it('should keep track of observers and notify onObserversCountChange()', () => { + const observer = { + onObserversCountChange: jest.fn() + } + const responseHandler = new ResponseHandler({ observer, log: Logger.noOp() }) + + responseHandler._queueObserver({}) + expect(observer.onObserversCountChange).toHaveBeenLastCalledWith(1) + + responseHandler._queueObserver({}) + expect(observer.onObserversCountChange).toHaveBeenLastCalledWith(2) + + responseHandler._queueObserver({}) + expect(observer.onObserversCountChange).toHaveBeenLastCalledWith(3) + + const success = { + signature: SUCCESS, + fields: [{}] + } + + responseHandler.handleResponse(success) + expect(observer.onObserversCountChange).toHaveBeenLastCalledWith(2) + + responseHandler.handleResponse(success) + expect(observer.onObserversCountChange).toHaveBeenLastCalledWith(1) + + responseHandler.handleResponse(success) + expect(observer.onObserversCountChange).toHaveBeenLastCalledWith(0) + + expect(observer.onObserversCountChange).toHaveBeenCalledTimes(6) + }) }) diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/response-handler.js b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/response-handler.js index decc35b07..8a0aeddbf 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/bolt/response-handler.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/bolt/response-handler.js @@ -76,7 +76,7 @@ export default class ResponseHandler { this._transformMetadata = transformMetadata || NO_OP_IDENTITY this._observer = Object.assign( { - onPendingObserversChange: NO_OP, + onObserversCountChange: NO_OP, onError: NO_OP, onFailure: NO_OP, onErrorApplyTransformation: NO_OP_IDENTITY @@ -156,7 +156,11 @@ export default class ResponseHandler { */ _updateCurrentObserver () { this._currentObserver = this._pendingObservers.shift() - this._observer.onPendingObserversChange(this._pendingObservers.length) + this._observer.onObserversCountChange(this._observersCount) + } + + get _observersCount () { + return this._currentObserver == null ? this._pendingObservers.length : this._pendingObservers.length + 1 } _queueObserver (observer) { @@ -169,7 +173,7 @@ export default class ResponseHandler { } else { this._pendingObservers.push(observer) } - this._observer.onPendingObserversChange(this._pendingObservers.length) + this._observer.onObserversCountChange(this._observersCount) return true } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js index 6a4dbe193..eb6cc3777 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js @@ -67,7 +67,7 @@ export function createChannelConnection ( server: conn.server, log: conn.logger, observer: { - onPendingObserversChange: conn._handleOngoingRequestsNumberChange.bind(conn), + onObserversCountChange: conn._handleOngoingRequestsNumberChange.bind(conn), onError: conn._handleFatalError.bind(conn), onFailure: conn._resetOnFailure.bind(conn), onProtocolError: conn._handleProtocolError.bind(conn),