diff --git a/src/v1/internal/connection-holder.js b/src/v1/internal/connection-holder.js index cc1212c5d..9ded035af 100644 --- a/src/v1/internal/connection-holder.js +++ b/src/v1/internal/connection-holder.js @@ -70,8 +70,7 @@ export default class ConnectionHolder { this._referenceCount--; if (this._referenceCount === 0) { - // release a connection without muting ACK_FAILURE, this is the last action on this connection - return this._releaseConnection(true); + return this._releaseConnection(); } return this._connectionPromise; } @@ -85,9 +84,7 @@ export default class ConnectionHolder { return this._connectionPromise; } this._referenceCount = 0; - // release a connection and mute ACK_FAILURE, this might be called concurrently with other - // operations and thus should ignore failure handling - return this._releaseConnection(false); + return this._releaseConnection(); } /** @@ -97,19 +94,16 @@ export default class ConnectionHolder { * @return {Promise} - promise resolved then connection is returned to the pool. * @private */ - _releaseConnection(sync) { + _releaseConnection() { this._connectionPromise = this._connectionPromise.then(connection => { if (connection) { - if(sync) { - connection.reset(); - } else { - connection.resetAsync(); - } - connection.sync(); - connection._release(); + return connection.resetAndFlush() + .catch(ignoreError) + .then(() => connection._release()); + } else { + return Promise.resolve(); } - }).catch(ignoredError => { - }); + }).catch(ignoreError); return this._connectionPromise; } @@ -134,6 +128,9 @@ class EmptyConnectionHolder extends ConnectionHolder { } } +function ignoreError(error) { +} + /** * Connection holder that does not manage any connections. * @type {ConnectionHolder} diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 1ffcf9a1f..e35f2d7e4 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -22,7 +22,7 @@ import NodeChannel from './ch-node'; import {Chunker, Dechunker} from './chunking'; import packStreamUtil from './packstream-util'; import {alloc} from './buf'; -import {newError} from './../error'; +import {newError, PROTOCOL_ERROR} from './../error'; import ChannelConfig from './ch-config'; import urlUtil from './url-util'; import StreamObserver from './stream-observer'; @@ -120,7 +120,7 @@ class Connection { this._packer = packStreamUtil.createLatestPacker(this._chunker); this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers); - this._isHandlingFailure = false; + this._ackFailureMuted = false; this._currentFailure = null; this._state = new ConnectionState(this); @@ -241,25 +241,8 @@ class Connection { this._currentObserver.onError( this._currentFailure ); } finally { this._updateCurrentObserver(); - // Things are now broken. Pending observers will get FAILURE messages routed until - // We are done handling this failure. - if( !this._isHandlingFailure ) { - this._isHandlingFailure = true; - - // isHandlingFailure was false, meaning this is the first failure message - // we see from this failure. We may see several others, one for each message - // we had "optimistically" already sent after whatever it was that failed. - // We only want to and need to ACK the first one, which is why we are tracking - // this _isHandlingFailure thing. - this._ackFailure({ - onNext: NO_OP, - onError: NO_OP, - onCompleted: () => { - this._isHandlingFailure = false; - this._currentFailure = null; - } - }); - } + // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure. + this._ackFailureIfNeeded(); } break; case IGNORED: @@ -268,7 +251,7 @@ class Connection { if (this._currentFailure && this._currentObserver.onError) this._currentObserver.onError(this._currentFailure); else if(this._currentObserver.onError) - this._currentObserver.onError(payload); + this._currentObserver.onError(newError('Ignored either because of an error or RESET')); } finally { this._updateCurrentObserver(); } @@ -282,72 +265,114 @@ class Connection { initialize( clientName, token, observer ) { log("C", "INIT", clientName, token); const initObserver = this._state.wrap(observer); - this._queueObserver(initObserver); - this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)], - (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); - this.sync(); + const queued = this._queueObserver(initObserver); + if (queued) { + this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)], + (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + this.sync(); + } } /** Queue a RUN-message to be sent to the database */ run( statement, params, observer ) { log("C", "RUN", statement, params); - this._queueObserver(observer); - this._packer.packStruct( RUN, [this._packable(statement), this._packable(params)], - (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)], + (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } } /** Queue a PULL_ALL-message to be sent to the database */ pullAll( observer ) { log("C", "PULL_ALL"); - this._queueObserver(observer); - this._packer.packStruct( PULL_ALL, [], (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(PULL_ALL, [], (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } } /** Queue a DISCARD_ALL-message to be sent to the database */ discardAll( observer ) { log("C", "DISCARD_ALL"); - this._queueObserver(observer); - this._packer.packStruct( DISCARD_ALL, [], (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(DISCARD_ALL, [], (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } } - /** Queue a RESET-message to be sent to the database. Mutes failure handling. */ - resetAsync( observer ) { - log("C", "RESET_ASYNC"); - this._isHandlingFailure = true; - let self = this; - let wrappedObs = { - onNext: observer ? observer.onNext : NO_OP, - onError: observer ? observer.onError : NO_OP, - onCompleted: () => { - self._isHandlingFailure = false; - if (observer) { - observer.onCompleted(); + /** + * Send a RESET-message to the database. Mutes failure handling. + * Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required. + * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. + */ + resetAndFlush() { + log('C', 'RESET'); + this._ackFailureMuted = true; + + return new Promise((resolve, reject) => { + const observer = { + onNext: record => { + const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record)); + reject(neo4jError); + }, + onError: error => { + if (this._isBroken) { + // handling a fatal error, no need to raise a protocol violation + reject(error); + } else { + const neo4jError = this._handleProtocolError('Received FAILURE as a response for RESET: ' + error); + reject(neo4jError); + } + }, + onCompleted: () => { + this._ackFailureMuted = false; + resolve(); } + }; + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(RESET, [], err => this._handleFatalError(err)); + this._chunker.messageBoundary(); + this.sync(); } - }; - this._queueObserver(wrappedObs); - this._packer.packStruct( RESET, [], (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + }); } - /** Queue a RESET-message to be sent to the database */ - reset(observer) { - log('C', 'RESET'); - this._queueObserver(observer); - this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err)); - this._chunker.messageBoundary(); - } + _ackFailureIfNeeded() { + if (this._ackFailureMuted) { + return; + } - /** Queue a ACK_FAILURE-message to be sent to the database */ - _ackFailure( observer ) { - log("C", "ACK_FAILURE"); - this._queueObserver(observer); - this._packer.packStruct( ACK_FAILURE, [], (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + log('C', 'ACK_FAILURE'); + + const observer = { + onNext: record => { + this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record)); + }, + onError: error => { + if (!this._isBroken && !this._ackFailureMuted) { + // not handling a fatal error and RESET did not cause the given error - looks like a protocol violation + this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error); + } else { + this._currentFailure = null; + } + }, + onCompleted: () => { + this._currentFailure = null; + } + }; + + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err)); + this._chunker.messageBoundary(); + this.sync(); + } } _queueObserver(observer) { @@ -355,7 +380,7 @@ class Connection { if( observer && observer.onError ) { observer.onError(this._error); } - return; + return false; } observer = observer || NO_OP_OBSERVER; observer.onCompleted = observer.onCompleted || NO_OP; @@ -366,6 +391,7 @@ class Connection { } else { this._pendingObservers.push( observer ); } + return true; } /** @@ -427,6 +453,15 @@ class Connection { } } } + + _handleProtocolError(message) { + this._ackFailureMuted = false; + this._currentFailure = null; + this._updateCurrentObserver(); + const error = newError(message, PROTOCOL_ERROR); + this._handleFatalError(error); + return error; + } } class ConnectionState { diff --git a/test/internal/connection-holder.test.js b/test/internal/connection-holder.test.js index e57306cc5..8a269f62e 100644 --- a/test/internal/connection-holder.test.js +++ b/test/internal/connection-holder.test.js @@ -168,7 +168,7 @@ describe('ConnectionHolder', () => { connectionHolder.initializeConnection(); connectionHolder.close().then(() => { - expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection.isReleasedOnce()).toBeTruthy(); done(); }); }); @@ -201,11 +201,11 @@ describe('ConnectionHolder', () => { connectionHolder.initializeConnection(); connectionHolder.close().then(() => { - expect(connection1.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection1.isReleasedOnce()).toBeTruthy(); connectionHolder.initializeConnection(); connectionHolder.close().then(() => { - expect(connection2.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection2.isReleasedOnce()).toBeTruthy(); done(); }); }); diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index 4c8880453..c738f981b 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -22,11 +22,16 @@ import {connect, Connection} from '../../src/v1/internal/connector'; import {Packer} from '../../src/v1/internal/packstream-v1'; import {Chunker} from '../../src/v1/internal/chunking'; import {alloc} from '../../src/v1/internal/buf'; -import {Neo4jError} from '../../src/v1/error'; +import {Neo4jError, newError} from '../../src/v1/error'; import sharedNeo4j from '../internal/shared-neo4j'; import {ServerVersion} from '../../src/v1/internal/server-version'; import lolex from 'lolex'; +const ILLEGAL_MESSAGE = {signature: 42, fields: []}; +const SUCCESS_MESSAGE = {signature: 0x70, fields: [{}]}; +const FAILURE_MESSAGE = {signature: 0x7F, fields: [newError('Hello')]}; +const RECORD_MESSAGE = {signature: 0x71, fields: [{value: 'Hello'}]}; + describe('connector', () => { let clock; @@ -241,6 +246,104 @@ describe('connector', () => { testConnectionTimeout(true, done); }); + it('should not queue INIT observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection.initialize('Hello', {}, {})); + }); + + it('should not queue RUN observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection.run('RETURN 1', {}, {})); + }); + + it('should not queue PULL_ALL observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection.pullAll({})); + }); + + it('should not queue DISCARD_ALL observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection.discardAll({})); + }); + + it('should not queue RESET observer when broken', () => { + const resetAction = connection => connection.resetAndFlush().catch(ignore => { + }); + + testQueueingOfObserversWithBrokenConnection(resetAction); + }); + + it('should not queue ACK_FAILURE observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection._ackFailureIfNeeded()); + }); + + it('should reset and flush when SUCCESS received', done => { + connection = connect('bolt://localhost'); + + connection.resetAndFlush().then(() => { + expect(connection.isOpen()).toBeTruthy(); + done(); + }).catch(error => done.fail(error)); + + connection._handleMessage(SUCCESS_MESSAGE); + }); + + it('should fail to reset and flush when FAILURE received', done => { + connection = connect('bolt://localhost'); + + connection.resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual('Received FAILURE as a response for RESET: Neo4jError: Hello'); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + done(); + }); + + connection._handleMessage(FAILURE_MESSAGE); + }); + + it('should fail to reset and flush when RECORD received', done => { + connection = connect('bolt://localhost'); + + connection.resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual('Received RECORD as a response for RESET: {"value":"Hello"}'); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + done(); + }); + + connection._handleMessage(RECORD_MESSAGE); + }); + + it('should ACK_FAILURE when SUCCESS received', () => { + connection = connect('bolt://localhost'); + + connection._currentFailure = newError('Hello'); + connection._ackFailureIfNeeded(); + + connection._handleMessage(SUCCESS_MESSAGE); + expect(connection._currentFailure).toBeNull(); + }); + + it('should fail the connection when ACK_FAILURE receives FAILURE', () => { + connection = connect('bolt://localhost'); + + connection._ackFailureIfNeeded(); + + connection._handleMessage(FAILURE_MESSAGE); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + }); + + it('should fail the connection when ACK_FAILURE receives RECORD', () => { + connection = connect('bolt://localhost'); + + connection._ackFailureIfNeeded(); + + connection._handleMessage(RECORD_MESSAGE); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + }); + function packedHandshakeMessage() { const result = alloc(4); result.putInt32(0, 1); @@ -301,4 +404,15 @@ describe('connector', () => { }); } + function testQueueingOfObserversWithBrokenConnection(connectionAction) { + connection = connect('bolt://localhost'); + + connection._handleMessage(ILLEGAL_MESSAGE); + expect(connection.isOpen()).toBeFalsy(); + + expect(connection._pendingObservers.length).toEqual(0); + connectionAction(connection); + expect(connection._pendingObservers.length).toEqual(0); + } + }); diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 284a24ea8..7816866aa 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -31,7 +31,6 @@ export default class FakeConnection { this.creationTimestamp = Date.now(); this.resetInvoked = 0; - this.resetAsyncInvoked = 0; this.syncInvoked = 0; this.releaseInvoked = 0; this.initializationInvoked = 0; @@ -54,8 +53,9 @@ export default class FakeConnection { this.resetInvoked++; } - resetAsync() { - this.resetAsyncInvoked++; + resetAndFlush() { + this.resetInvoked++; + return Promise.resolve(); } sync() { @@ -75,17 +75,6 @@ export default class FakeConnection { return this._open; } - isReleasedOnceOnSessionClose() { - return this.isReleasedOnSessionCloseTimes(1); - } - - isReleasedOnSessionCloseTimes(times) { - return this.resetAsyncInvoked === times && - this.resetInvoked === 0 && - this.syncInvoked === times && - this.releaseInvoked === times; - } - isNeverReleased() { return this.isReleasedTimes(0); } @@ -95,10 +84,7 @@ export default class FakeConnection { } isReleasedTimes(times) { - return this.resetAsyncInvoked === 0 && - this.resetInvoked === times && - this.syncInvoked === times && - this.releaseInvoked === times; + return this.resetInvoked === times && this.releaseInvoked === times; } withServerVersion(version) { diff --git a/test/resources/boltstub/reset_error.script b/test/resources/boltstub/reset_error.script new file mode 100644 index 000000000..e335d5c59 --- /dev/null +++ b/test/resources/boltstub/reset_error.script @@ -0,0 +1,9 @@ +!: AUTO INIT + +C: RUN "RETURN 42 AS answer" {} + PULL_ALL +S: SUCCESS {"fields": ["answer"]} + RECORD [42] + SUCCESS {} +C: RESET +S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to reset"} diff --git a/test/v1/direct.driver.boltkit.test.js b/test/v1/direct.driver.boltkit.test.js index 76d743c8e..c443c1a31 100644 --- a/test/v1/direct.driver.boltkit.test.js +++ b/test/v1/direct.driver.boltkit.test.js @@ -274,4 +274,34 @@ describe('direct driver with stub server', () => { }); }); + it('should close connection when RESET fails', done => { + if (!boltStub.supported) { + done(); + return; + } + + const server = boltStub.start('./test/resources/boltstub/reset_error.script', 9001); + + boltStub.run(() => { + const driver = boltStub.newDriver('bolt://127.0.0.1:9001'); + const session = driver.session(); + + session.run('RETURN 42 AS answer').then(result => { + const records = result.records; + expect(records.length).toEqual(1); + expect(records[0].get(0).toNumber()).toEqual(42); + session.close(() => { + + expect(driver._pool._pools['127.0.0.1:9001'].length).toEqual(0); + driver.close(); + server.exit(code => { + expect(code).toEqual(0); + done(); + }); + + }); + }).catch(error => done.fail(error)); + }); + }); + }); diff --git a/test/v1/session.test.js b/test/v1/session.test.js index 6a9150fba..3a88b086a 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -77,13 +77,13 @@ describe('session', () => { const session = newSessionWithConnection(connection); session.close(() => { - expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection.isReleasedOnce()).toBeTruthy(); session.close(() => { - expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection.isReleasedOnce()).toBeTruthy(); session.close(() => { - expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection.isReleasedOnce()).toBeTruthy(); done(); }); });