Skip to content

Propagate authentication errors #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {alloc} from './buf';
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
import {newError} from './../error';
import ChannelConfig from './ch-config';
import StreamObserver from './stream-observer';

let Channel;
if( NodeChannel.available ) {
Expand Down Expand Up @@ -272,7 +273,7 @@ class Connection {
* failing, and the connection getting ejected from the session pool.
*
* @param err an error object, forwarded to all current and future subscribers
* @private
* @protected
*/
_handleFatalError( err ) {
this._isBroken = true;
Expand All @@ -289,6 +290,12 @@ class Connection {
}

_handleMessage( msg ) {
if (this._isBroken) {
// ignore all incoming messages when this connection is broken. all previously pending observers failed
// with the fatal error. all future observers will fail with same fatal error.
return;
}

const payload = msg.fields[0];

switch( msg.signature ) {
Expand All @@ -301,7 +308,7 @@ class Connection {
try {
this._currentObserver.onCompleted( payload );
} finally {
this._currentObserver = this._pendingObservers.shift();
this._updateCurrentObserver();
}
break;
case FAILURE:
Expand All @@ -310,7 +317,7 @@ class Connection {
this._currentFailure = newError(payload.message, payload.code);
this._currentObserver.onError( this._currentFailure );
} finally {
this._currentObserver = this._pendingObservers.shift();
this._updateCurrentObserver();
// Things are now broken. Pending observers will get FAILURE messages routed until
// We are done handling this failure.
if( !this._isHandlingFailure ) {
Expand Down Expand Up @@ -340,7 +347,7 @@ class Connection {
else if(this._currentObserver.onError)
this._currentObserver.onError(payload);
} finally {
this._currentObserver = this._pendingObservers.shift();
this._updateCurrentObserver();
}
break;
default:
Expand All @@ -351,7 +358,8 @@ class Connection {
/** Queue an INIT-message to be sent to the database */
initialize( clientName, token, observer ) {
log("C", "INIT", clientName, token);
this._queueObserver(observer);
const initObserver = new InitObserver(this, observer);
this._queueObserver(initObserver);
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
(err) => this._handleFatalError(err) );
this._chunker.messageBoundary();
Expand Down Expand Up @@ -437,6 +445,14 @@ class Connection {
}
}

/**
* Pop next pending observer form the list of observers and make it current observer.
* @protected
*/
_updateCurrentObserver() {
this._currentObserver = this._pendingObservers.shift();
}

/**
* Synchronize - flush all queued outgoing messages and route their responses
* to their respective handlers.
Expand Down Expand Up @@ -489,6 +505,42 @@ function connect(url, config = {}, connectionErrorCode = null) {
return new Connection( new Ch(channelConfig), completeUrl);
}

/**
* Observer that wraps user-defined observer for INIT message and handles initialization failures. Connection is
* closed by the server if processing of INIT message fails so this observer will handle initialization failure
* as a fatal error.
*/
class InitObserver extends StreamObserver {

/**
* @constructor
* @param {Connection} connection the connection used to send INIT message.
* @param {StreamObserver} originalObserver the observer to wrap and delegate calls to.
*/
constructor(connection, originalObserver) {
super();
this._connection = connection;
this._originalObserver = originalObserver || NO_OP_OBSERVER;
}

onNext(record) {
this._originalObserver.onNext(record);
}

onError(error) {
this._connection._updateCurrentObserver(); // make sure this same observer will not be called again
try {
this._originalObserver.onError(error);
} finally {
this._connection._handleFatalError(error);
}
}

onCompleted(metaData) {
this._originalObserver.onCompleted(metaData);
}
}

export {
connect,
parseScheme,
Expand Down
11 changes: 8 additions & 3 deletions src/v1/internal/get-servers-util.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import Integer, {int} from '../integer';

const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers';
const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';
const UNAUTHORIZED_CODE = 'Neo.ClientError.Security.Unauthorized';

export default class GetServersUtil {

Expand All @@ -35,10 +36,14 @@ export default class GetServersUtil {
// throw when getServers procedure not found because this is clearly a configuration issue
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
} else if (error.code === UNAUTHORIZED_CODE) {
// auth error is a sign of a configuration issue, rediscovery should not proceed
throw error;
} else {
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
// different session towards a different router
return null;
}
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
// different session towards a different router
return null;
});
}

Expand Down
48 changes: 48 additions & 0 deletions test/internal/connector.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,54 @@ describe('connector', () => {
channel.onmessage(packedFailureMessage(errorCode, errorMessage));
});

it('should notify provided observer when connection initialization completes', done => {
const connection = connect('bolt://localhost');

connection.initialize('mydriver/0.0.0', basicAuthToken(), {
onCompleted: metaData => {
expect(connection.isOpen()).toBeTruthy();
expect(metaData).toBeDefined();
done();
},
});
});

it('should notify provided observer when connection initialization fails', done => {
const connection = connect('bolt://localhost:7474'); // wrong port

connection.initialize('mydriver/0.0.0', basicAuthToken(), {
onError: error => {
expect(connection.isOpen()).toBeFalsy();
expect(error).toBeDefined();
done();
},
});
});

it('should fail all new observers after initialization error', done => {
const connection = connect('bolt://localhost:7474'); // wrong port

connection.initialize('mydriver/0.0.0', basicAuthToken(), {
onError: initialError => {
expect(initialError).toBeDefined();

connection.run('RETURN 1', {}, {
onError: error1 => {
expect(error1).toEqual(initialError);

connection.initialize('mydriver/0.0.0', basicAuthToken(), {
onError: error2 => {
expect(error2).toEqual(initialError);

done();
}
});
}
});
},
});
});

function packedHandshakeMessage() {
const result = alloc(4);
result.putInt32(0, 1);
Expand Down
6 changes: 6 additions & 0 deletions test/resources/boltkit/failed_auth.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
!: AUTO RESET
!: AUTO PULL_ALL

C: INIT "neo4j-javascript/0.0.0-dev" {"credentials": "neo4j", "scheme": "basic", "principal": "neo4j"}
S: FAILURE {"code": "Neo.ClientError.Security.Unauthorized", "message": "Some server auth error message"}
S: <EXIT>
16 changes: 15 additions & 1 deletion test/v1/driver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ describe('driver', () => {

it('should fail early on wrong credentials', done => {
// Given
driver = neo4j.driver("bolt://localhost", neo4j.auth.basic("neo4j", "who would use such a password"));
driver = neo4j.driver("bolt://localhost", wrongCredentials());

// Expect
driver.onError = err => {
Expand All @@ -93,6 +93,16 @@ describe('driver', () => {
startNewTransaction(driver);
});

it('should fail queries on wrong credentials', done => {
driver = neo4j.driver("bolt://localhost", wrongCredentials());

const session = driver.session();
session.run('RETURN 1').catch(error => {
expect(error.code).toEqual('Neo.ClientError.Security.Unauthorized');
done();
});
});

it('should indicate success early on correct credentials', done => {
// Given
driver = neo4j.driver("bolt://localhost", sharedNeo4j.authToken);
Expand Down Expand Up @@ -207,4 +217,8 @@ describe('driver', () => {
expect(session.beginTransaction()).toBeDefined();
}

function wrongCredentials() {
return neo4j.auth.basic('neo4j', 'who would use such a password');
}

});
27 changes: 27 additions & 0 deletions test/v1/routing.driver.boltkit.it.js
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,33 @@ describe('routing driver', () => {
});
});

it('should fail rediscovery on auth error', done => {
if (!boltkit.BoltKitSupport) {
done();
return;
}

const kit = new boltkit.BoltKit();
const router = kit.start('./test/resources/boltkit/failed_auth.script', 9010);

kit.run(() => {
const driver = newDriver('bolt+routing://127.0.0.1:9010');
const session = driver.session();
session.run('RETURN 1').catch(error => {
expect(error.code).toEqual('Neo.ClientError.Security.Unauthorized');
expect(error.message).toEqual('Some server auth error message');

session.close(() => {
driver.close();
router.exit(code => {
expect(code).toEqual(0);
done();
});
});
});
});
});

function moveNextDateNow30SecondsForward() {
const currentTime = Date.now();
hijackNextDateNowCall(currentTime + 30 * 1000 + 1);
Expand Down