Skip to content

Improve failure handling for RESET and ACK_FAILURE #376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions src/v1/internal/connection-holder.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ export default class ConnectionHolder {

this._referenceCount--;
if (this._referenceCount === 0) {
// release a connection without muting ACK_FAILURE, this is the last action on this connection
return this._releaseConnection(true);
return this._releaseConnection();
}
return this._connectionPromise;
}
Expand All @@ -85,9 +84,7 @@ export default class ConnectionHolder {
return this._connectionPromise;
}
this._referenceCount = 0;
// release a connection and mute ACK_FAILURE, this might be called concurrently with other
// operations and thus should ignore failure handling
return this._releaseConnection(false);
return this._releaseConnection();
}

/**
Expand All @@ -97,19 +94,16 @@ export default class ConnectionHolder {
* @return {Promise} - promise resolved then connection is returned to the pool.
* @private
*/
_releaseConnection(sync) {
_releaseConnection() {
this._connectionPromise = this._connectionPromise.then(connection => {
if (connection) {
if(sync) {
connection.reset();
} else {
connection.resetAsync();
}
connection.sync();
connection._release();
return connection.resetAndFlush()
.catch(ignoreError)
.then(() => connection._release());
} else {
return Promise.resolve();
}
}).catch(ignoredError => {
});
}).catch(ignoreError);

return this._connectionPromise;
}
Expand All @@ -134,6 +128,9 @@ class EmptyConnectionHolder extends ConnectionHolder {
}
}

function ignoreError(error) {
}

/**
* Connection holder that does not manage any connections.
* @type {ConnectionHolder}
Expand Down
169 changes: 102 additions & 67 deletions src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import NodeChannel from './ch-node';
import {Chunker, Dechunker} from './chunking';
import packStreamUtil from './packstream-util';
import {alloc} from './buf';
import {newError} from './../error';
import {newError, PROTOCOL_ERROR} from './../error';
import ChannelConfig from './ch-config';
import urlUtil from './url-util';
import StreamObserver from './stream-observer';
Expand Down Expand Up @@ -120,7 +120,7 @@ class Connection {
this._packer = packStreamUtil.createLatestPacker(this._chunker);
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);

this._isHandlingFailure = false;
this._ackFailureMuted = false;
this._currentFailure = null;

this._state = new ConnectionState(this);
Expand Down Expand Up @@ -241,25 +241,8 @@ class Connection {
this._currentObserver.onError( this._currentFailure );
} finally {
this._updateCurrentObserver();
// Things are now broken. Pending observers will get FAILURE messages routed until
// We are done handling this failure.
if( !this._isHandlingFailure ) {
this._isHandlingFailure = true;

// isHandlingFailure was false, meaning this is the first failure message
// we see from this failure. We may see several others, one for each message
// we had "optimistically" already sent after whatever it was that failed.
// We only want to and need to ACK the first one, which is why we are tracking
// this _isHandlingFailure thing.
this._ackFailure({
onNext: NO_OP,
onError: NO_OP,
onCompleted: () => {
this._isHandlingFailure = false;
this._currentFailure = null;
}
});
}
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
this._ackFailureIfNeeded();
}
break;
case IGNORED:
Expand All @@ -268,7 +251,7 @@ class Connection {
if (this._currentFailure && this._currentObserver.onError)
this._currentObserver.onError(this._currentFailure);
else if(this._currentObserver.onError)
this._currentObserver.onError(payload);
this._currentObserver.onError(newError('Ignored either because of an error or RESET'));
} finally {
this._updateCurrentObserver();
}
Expand All @@ -282,80 +265,122 @@ class Connection {
initialize( clientName, token, observer ) {
log("C", "INIT", clientName, token);
const initObserver = this._state.wrap(observer);
this._queueObserver(initObserver);
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
(err) => this._handleFatalError(err) );
this._chunker.messageBoundary();
this.sync();
const queued = this._queueObserver(initObserver);
if (queued) {
this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)],
(err) => this._handleFatalError(err));
this._chunker.messageBoundary();
this.sync();
}
}

/** Queue a RUN-message to be sent to the database */
run( statement, params, observer ) {
log("C", "RUN", statement, params);
this._queueObserver(observer);
this._packer.packStruct( RUN, [this._packable(statement), this._packable(params)],
(err) => this._handleFatalError(err) );
this._chunker.messageBoundary();
const queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)],
(err) => this._handleFatalError(err));
this._chunker.messageBoundary();
}
}

/** Queue a PULL_ALL-message to be sent to the database */
pullAll( observer ) {
log("C", "PULL_ALL");
this._queueObserver(observer);
this._packer.packStruct( PULL_ALL, [], (err) => this._handleFatalError(err) );
this._chunker.messageBoundary();
const queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(PULL_ALL, [], (err) => this._handleFatalError(err));
this._chunker.messageBoundary();
}
}

/** Queue a DISCARD_ALL-message to be sent to the database */
discardAll( observer ) {
log("C", "DISCARD_ALL");
this._queueObserver(observer);
this._packer.packStruct( DISCARD_ALL, [], (err) => this._handleFatalError(err) );
this._chunker.messageBoundary();
const queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(DISCARD_ALL, [], (err) => this._handleFatalError(err));
this._chunker.messageBoundary();
}
}

/** Queue a RESET-message to be sent to the database. Mutes failure handling. */
resetAsync( observer ) {
log("C", "RESET_ASYNC");
this._isHandlingFailure = true;
let self = this;
let wrappedObs = {
onNext: observer ? observer.onNext : NO_OP,
onError: observer ? observer.onError : NO_OP,
onCompleted: () => {
self._isHandlingFailure = false;
if (observer) {
observer.onCompleted();
/**
* Send a RESET-message to the database. Mutes failure handling.
* Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required.
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
*/
resetAndFlush() {
log('C', 'RESET');
this._ackFailureMuted = true;

return new Promise((resolve, reject) => {
const observer = {
onNext: record => {
const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
reject(neo4jError);
},
onError: error => {
if (this._isBroken) {
// handling a fatal error, no need to raise a protocol violation
reject(error);
} else {
const neo4jError = this._handleProtocolError('Received FAILURE as a response for RESET: ' + error);
reject(neo4jError);
}
},
onCompleted: () => {
this._ackFailureMuted = false;
resolve();
}
};
const queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
this._chunker.messageBoundary();
this.sync();
}
};
this._queueObserver(wrappedObs);
this._packer.packStruct( RESET, [], (err) => this._handleFatalError(err) );
this._chunker.messageBoundary();
});
}

/** Queue a RESET-message to be sent to the database */
reset(observer) {
log('C', 'RESET');
this._queueObserver(observer);
this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err));
this._chunker.messageBoundary();
}
_ackFailureIfNeeded() {
if (this._ackFailureMuted) {
return;
}

/** Queue a ACK_FAILURE-message to be sent to the database */
_ackFailure( observer ) {
log("C", "ACK_FAILURE");
this._queueObserver(observer);
this._packer.packStruct( ACK_FAILURE, [], (err) => this._handleFatalError(err) );
this._chunker.messageBoundary();
log('C', 'ACK_FAILURE');

const observer = {
onNext: record => {
this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record));
},
onError: error => {
if (!this._isBroken && !this._ackFailureMuted) {
// not handling a fatal error and RESET did not cause the given error - looks like a protocol violation
this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error);
} else {
this._currentFailure = null;
}
},
onCompleted: () => {
this._currentFailure = null;
}
};

const queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err));
this._chunker.messageBoundary();
this.sync();
}
}

_queueObserver(observer) {
if( this._isBroken ) {
if( observer && observer.onError ) {
observer.onError(this._error);
}
return;
return false;
}
observer = observer || NO_OP_OBSERVER;
observer.onCompleted = observer.onCompleted || NO_OP;
Expand All @@ -366,6 +391,7 @@ class Connection {
} else {
this._pendingObservers.push( observer );
}
return true;
}

/**
Expand Down Expand Up @@ -427,6 +453,15 @@ class Connection {
}
}
}

_handleProtocolError(message) {
this._ackFailureMuted = false;
this._currentFailure = null;
this._updateCurrentObserver();
const error = newError(message, PROTOCOL_ERROR);
this._handleFatalError(error);
return error;
}
}

class ConnectionState {
Expand Down
6 changes: 3 additions & 3 deletions test/internal/connection-holder.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ describe('ConnectionHolder', () => {
connectionHolder.initializeConnection();

connectionHolder.close().then(() => {
expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy();
expect(connection.isReleasedOnce()).toBeTruthy();
done();
});
});
Expand Down Expand Up @@ -201,11 +201,11 @@ describe('ConnectionHolder', () => {
connectionHolder.initializeConnection();

connectionHolder.close().then(() => {
expect(connection1.isReleasedOnceOnSessionClose()).toBeTruthy();
expect(connection1.isReleasedOnce()).toBeTruthy();

connectionHolder.initializeConnection();
connectionHolder.close().then(() => {
expect(connection2.isReleasedOnceOnSessionClose()).toBeTruthy();
expect(connection2.isReleasedOnce()).toBeTruthy();
done();
});
});
Expand Down
Loading