From 986b4b0e437c7b0c7092ed0cfd680fb9af2dabaa Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 27 Aug 2018 15:30:13 +0200 Subject: [PATCH] Improve ordering of connection initialization Make sure every connection acquired from the connection pool is fully initialized and authenticated. Before this change, a connection was created in a synchronous way. Then protocol handshake and INIT message were sent asynchronously. So creation of a connection looked like a sync operation but it is not. Authentication info was also sent with INIT message regardless of the Bolt protocol negotiation. This worked fine for Bolt V1 and V2 because they used same INIT message. However, for Bolt V3 initialization has to happen after protocol version negotiation because the initialization message is different. Commit also moves error handling to the connection layer. Before, error handling/processing was in both `StreamObserver` and `Transaction`. It's mostly needed to handle failures in RoutingDriver (forget address from the routing table). Now handling of errors will be done in a single place - `Connection` object using a special `ConnectionErrorHandler`. This handler is different for direct driver and routing drivers. --- src/v1/driver.js | 45 +- src/v1/internal/connection-error-handler.js | 74 +++ src/v1/internal/connection-holder.js | 2 +- src/v1/internal/connection-providers.js | 40 +- .../internal/{connector.js => connection.js} | 327 ++++++-------- src/v1/internal/http/http-driver.js | 2 +- src/v1/internal/pool.js | 124 ++++-- src/v1/internal/protocol-handshaker.js | 10 - src/v1/internal/request-message.js | 11 +- src/v1/internal/routing-util.js | 4 - src/v1/internal/stream-observer.js | 22 +- src/v1/routing-driver.js | 70 +-- src/v1/session.js | 13 +- src/v1/transaction.js | 6 +- test/internal/bolt-protocol-v1.test.js | 1 - .../internal/connection-error-handler.test.js | 76 ++++ test/internal/connection-holder.test.js | 17 +- test/internal/connection-providers.test.js | 10 +- test/internal/connection.test.js | 421 ++++++++++++++++++ test/internal/connector.test.js | 388 ---------------- test/internal/fake-connection.js | 13 - test/internal/pool.test.js | 38 +- test/internal/protocol-handshaker.test.js | 10 - test/internal/request-message.test.js | 4 - test/internal/shared-neo4j.js | 2 +- test/internal/stream-observer.test.js | 7 +- test/v1/session.test.js | 25 +- 27 files changed, 918 insertions(+), 844 deletions(-) create mode 100644 src/v1/internal/connection-error-handler.js rename src/v1/internal/{connector.js => connection.js} (56%) create mode 100644 test/internal/connection-error-handler.test.js create mode 100644 test/internal/connection.test.js delete mode 100644 test/internal/connector.test.js diff --git a/src/v1/driver.js b/src/v1/driver.js index 585cd3cc5..8b4be63f3 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -19,7 +19,7 @@ import Session from './session'; import Pool from './internal/pool'; -import {connect} from './internal/connector'; +import Connection from './internal/connection'; import StreamObserver from './internal/stream-observer'; import {newError, SERVICE_UNAVAILABLE} from './error'; import {DirectConnectionProvider} from './internal/connection-providers'; @@ -27,6 +27,7 @@ import Bookmark from './internal/bookmark'; import ConnectivityVerifier from './internal/connectivity-verifier'; import PoolConfig, {DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE} from './internal/pool-config'; import Logger from './internal/logger'; +import ConnectionErrorHandler from './internal/connection-error-handler'; const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000; // 1 hour @@ -62,18 +63,18 @@ class Driver { * @constructor * @param {string} hostPort * @param {string} userAgent - * @param {object} token + * @param {object} authToken * @param {object} config * @protected */ - constructor(hostPort, userAgent, token = {}, config = {}) { + constructor(hostPort, userAgent, authToken = {}, config = {}) { sanitizeConfig(config); this._id = idGenerator++; this._hostPort = hostPort; this._userAgent = userAgent; this._openConnections = {}; - this._token = token; + this._authToken = authToken; this._config = config; this._log = Logger.create(config); this._pool = new Pool( @@ -127,18 +128,24 @@ class Driver { } /** - * Create a new connection instance. - * @return {Connection} new connector-api session instance, a low level session API. + * Create a new connection and initialize it. + * @return {Promise} promise resolved with a new connection or rejected when failed to connect. * @access private */ _createConnection(hostPort, release) { - const conn = connect(hostPort, this._config, this._connectionErrorCode(), this._log); - const streamObserver = new _ConnectionStreamObserver(this, conn); - conn.protocol().initialize(this._userAgent, this._token, streamObserver); - conn._release = () => release(hostPort, conn); - - this._openConnections[conn.id] = conn; - return conn; + const connection = Connection.create(hostPort, this._config, this._createConnectionErrorHandler(), this._log); + connection._release = () => release(hostPort, connection); + this._openConnections[connection.id] = connection; + + return connection.connect(this._userAgent, this._authToken) + .catch(error => { + if (this.onError) { + // notify Driver.onError callback about connection initialization errors + this.onError(error); + } + // propagate the error because connection failed to connect / initialize + throw error; + }); } /** @@ -186,7 +193,7 @@ class Driver { const sessionMode = Driver._validateSessionMode(mode); const connectionProvider = this._getOrCreateConnectionProvider(); const bookmark = new Bookmark(bookmarkOrBookmarks); - return this._createSession(sessionMode, connectionProvider, bookmark, this._config); + return new Session(sessionMode, connectionProvider, bookmark, this._config); } static _validateSessionMode(rawMode) { @@ -203,14 +210,8 @@ class Driver { } // Extension point - _createSession(mode, connectionProvider, bookmark, config) { - return new Session(mode, connectionProvider, bookmark, config); - } - - // Extension point - _connectionErrorCode() { - // connection errors might result in different error codes depending on the driver - return SERVICE_UNAVAILABLE; + _createConnectionErrorHandler() { + return new ConnectionErrorHandler(SERVICE_UNAVAILABLE); } _getOrCreateConnectionProvider() { diff --git a/src/v1/internal/connection-error-handler.js b/src/v1/internal/connection-error-handler.js new file mode 100644 index 000000000..b8ee33507 --- /dev/null +++ b/src/v1/internal/connection-error-handler.js @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error'; + +export default class ConnectionErrorHandler { + + constructor(errorCode, handleUnavailability, handleWriteFailure) { + this._errorCode = errorCode; + this._handleUnavailability = handleUnavailability || noOpHandler; + this._handleWriteFailure = handleWriteFailure || noOpHandler; + } + + /** + * Error code to use for network errors. + * @return {string} the error code. + */ + errorCode() { + return this._errorCode; + } + + /** + * Handle and transform the error. + * @param {Neo4jError} error the original error. + * @param {string} hostPort the host and port of the connection where the error happened. + * @return {Neo4jError} new error that should be propagated to the user. + */ + handleAndTransformError(error, hostPort) { + if (isAvailabilityError(error)) { + return this._handleUnavailability(error, hostPort); + } + if (isFailureToWrite(error)) { + return this._handleWriteFailure(error, hostPort); + } + return error; + } +} + +function isAvailabilityError(error) { + if (error) { + return error.code === SESSION_EXPIRED || + error.code === SERVICE_UNAVAILABLE || + error.code === 'Neo.TransientError.General.DatabaseUnavailable'; + } + return false; +} + +function isFailureToWrite(error) { + if (error) { + return error.code === 'Neo.ClientError.Cluster.NotALeader' || + error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'; + } + return false; +} + +function noOpHandler(error) { + return error; +} diff --git a/src/v1/internal/connection-holder.js b/src/v1/internal/connection-holder.js index 9ded035af..2c88ce4e8 100644 --- a/src/v1/internal/connection-holder.js +++ b/src/v1/internal/connection-holder.js @@ -55,7 +55,7 @@ export default class ConnectionHolder { getConnection(streamObserver) { return this._connectionPromise.then(connection => { streamObserver.resolveConnection(connection); - return connection.initializationCompleted(); + return connection; }); } diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index ca74b53c4..566ce9b2e 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -26,6 +26,8 @@ import hasFeature from './features'; import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers'; import RoutingUtil from './routing-util'; +const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized'; + class ConnectionProvider { acquireConnection(mode) { @@ -195,20 +197,32 @@ export class LoadBalancer extends ConnectionProvider { // try next router return this._createSessionForRediscovery(currentRouter).then(session => { - return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter) + if (session) { + return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter); + } else { + // unable to acquire connection and create session towards the current router + // return null to signal that the next router should be tried + return null; + } }); }); }, Promise.resolve(null)); } _createSessionForRediscovery(routerAddress) { - return this._connectionPool.acquire(routerAddress).then(connection => { - // initialized connection is required for routing procedure call - // server version needs to be known to decide which routing procedure to use - const initializedConnectionPromise = connection.initializationCompleted(); - const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise); - return new Session(READ, connectionProvider); - }); + return this._connectionPool.acquire(routerAddress) + .then(connection => { + const connectionProvider = new SingleConnectionProvider(connection); + return new Session(READ, connectionProvider); + }) + .catch(error => { + // unable to acquire connection towards the given router + if (error && error.code === UNAUTHORIZED_ERROR_CODE) { + // auth error is a sign of a configuration issue, rediscovery should not proceed + throw error; + } + return null; + }); } _applyRoutingTableIfPossible(newRoutingTable) { @@ -257,14 +271,14 @@ export class LoadBalancer extends ConnectionProvider { export class SingleConnectionProvider extends ConnectionProvider { - constructor(connectionPromise) { + constructor(connection) { super(); - this._connectionPromise = connectionPromise; + this._connection = connection; } acquireConnection(mode) { - const connectionPromise = this._connectionPromise; - this._connectionPromise = null; - return connectionPromise; + const connection = this._connection; + this._connection = null; + return Promise.resolve(connection); } } diff --git a/src/v1/internal/connector.js b/src/v1/internal/connection.js similarity index 56% rename from src/v1/internal/connector.js rename to src/v1/internal/connection.js index 5d7ee53c6..69880bde3 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connection.js @@ -49,7 +49,7 @@ const FAILURE = 0x7F; // 0111 1111 // FAILURE function NO_OP(){} -let NO_OP_OBSERVER = { +const NO_OP_OBSERVER = { onNext : NO_OP, onCompleted : NO_OP, onError : NO_OP @@ -57,33 +57,22 @@ let NO_OP_OBSERVER = { let idGenerator = 0; -/** - * A connection manages sending and receiving messages over a channel. A - * connector is very closely tied to the Bolt protocol, it implements the - * same message structure with very little frills. This means Connectors are - * naturally tied to a specific version of the protocol, and we expect - * another layer will be needed to support multiple versions. - * - * The connector tries to batch outbound messages by requiring its users - * to call 'sync' when messages need to be sent, and it routes response - * messages back to the originators of the requests that created those - * response messages. - * @access private - */ -class Connection { +export default class Connection { /** * @constructor * @param {NodeChannel|WebSocketChannel} channel - channel with a 'write' function and a 'onmessage' callback property. + * @param {ConnectionErrorHandler} errorHandler the error handler. * @param {string} hostPort - the hostname and port to connect to. * @param {Logger} log - the configured logger. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. */ - constructor(channel, hostPort, log, disableLosslessIntegers = false) { + constructor(channel, errorHandler, hostPort, log, disableLosslessIntegers = false) { this.id = idGenerator++; this.hostPort = hostPort; this.server = {address: hostPort}; this.creationTimestamp = Date.now(); + this._errorHandler = errorHandler; this._disableLosslessIntegers = disableLosslessIntegers; this._pendingObservers = []; this._currentObserver = undefined; @@ -92,43 +81,107 @@ class Connection { this._chunker = new Chunker( channel ); this._log = log; - const protocolHandshaker = new ProtocolHandshaker(this, channel, this._chunker, this._disableLosslessIntegers, this._log); - // initially assume that database supports latest Bolt version, create latest packer and unpacker - this._protocol = protocolHandshaker.createLatestProtocol(); + // bolt protocol is initially not initialized + this._protocol = null; + // error extracted from a FAILURE message this._currentFailure = null; - this._state = new ConnectionState(this); - - // Set to true on fatal errors, to get this out of session pool. + // Set to true on fatal errors, to get this out of connection pool. this._isBroken = false; - // TODO: Using `onmessage` and `onerror` came from the WebSocket API, - // it reads poorly and has several annoying drawbacks. Swap to having - // Channel extend EventEmitter instead, then we can use `on('data',..)` - this._ch.onmessage = buffer => this._initializeProtocol(buffer, protocolHandshaker); - - // Listen to connection errors. Important note though; - // In some cases we will get a channel that is already broken (for instance, - // if the user passes invalid configuration options). In this case, onerror - // will have "already triggered" before we add out listener here. So the line - // below also checks that the channel is not already failed. This could be nicely - // encapsulated into Channel if we used `on('error', ..)` rather than `onerror=..` - // as outlined in the comment about `onmessage` further up in this file. - this._ch.onerror = this._handleFatalError.bind(this); - if( this._ch._error ) { - this._handleFatalError(this._ch._error); - } - - this._dechunker.onmessage = (buf) => { - this._handleMessage(this._protocol.unpacker().unpack(buf)); - }; - if (this._log.isDebugEnabled()) { this._log.debug(`${this} created towards ${hostPort}`); } + } + + /** + * Crete new connection to the provided address. Returned connection is not connected. + * @param {string} url - the Bolt endpoint to connect to. + * @param {object} config - this driver configuration. + * @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors. + * @param {Logger} log - configured logger. + * @return {Connection} - new connection. + */ + static create(url, config, errorHandler, log) { + const Ch = config.channel || Channel; + const parsedAddress = urlUtil.parseDatabaseUrl(url); + const channelConfig = new ChannelConfig(parsedAddress, config, errorHandler.errorCode()); + return new Connection(new Ch(channelConfig), errorHandler, parsedAddress.hostAndPort, log, config.disableLosslessIntegers); + } + + /** + * Connect to the target address, negotiate Bolt protocol and send initialization message. + * @param {string} userAgent the user agent for this driver. + * @param {object} authToken the object containing auth information. + * @return {Promise} promise resolved with the current connection if connection is successful. Rejected promise otherwise. + */ + connect(userAgent, authToken) { + return this._negotiateProtocol().then(() => this._initialize(userAgent, authToken)); + } + + /** + * Execute Bolt protocol handshake to initialize the protocol version. + * @return {Promise} promise resolved with the current connection if handshake is successful. Rejected promise otherwise. + */ + _negotiateProtocol() { + const protocolHandshaker = new ProtocolHandshaker(this, this._ch, this._chunker, this._disableLosslessIntegers, this._log); + + return new Promise((resolve, reject) => { + + const handshakeErrorHandler = error => { + this._handleFatalError(error); + reject(error); + }; + + this._ch.onerror = handshakeErrorHandler.bind(this); + if (this._ch._error) { + // channel is already broken + handshakeErrorHandler(this._ch._error); + } + + this._ch.onmessage = buffer => { + try { + // read the response buffer and initialize the protocol + this._protocol = protocolHandshaker.createNegotiatedProtocol(buffer); + + // reset the error handler to just handle errors and forget about the handshake promise + this._ch.onerror = this._handleFatalError.bind(this); + + // Ok, protocol running. Simply forward all messages to the dechunker + this._ch.onmessage = buf => this._dechunker.write(buf); + + // setup dechunker to dechunk messages and forward them to the message handler + this._dechunker.onmessage = (buf) => { + this._handleMessage(this._protocol.unpacker().unpack(buf)); + }; + // forward all pending bytes to the dechunker + if (buffer.hasRemaining()) { + this._dechunker.write(buffer.readSlice(buffer.remaining())); + } + + resolve(this); + } catch (e) { + this._handleFatalError(e); + reject(e); + } + }; - protocolHandshaker.writeHandshakeRequest(); + protocolHandshaker.writeHandshakeRequest(); + }); + } + + /** + * Perform protocol-specific initialization which includes authentication. + * @param {string} userAgent the user agent for this driver. + * @param {object} authToken the object containing auth information. + * @return {Promise} promise resolved with the current connection if initialization is successful. Rejected promise otherwise. + */ + _initialize(userAgent, authToken) { + return new Promise((resolve, reject) => { + const observer = new InitializationObserver(this, resolve, reject); + this._protocol.initialize(userAgent, authToken, observer); + }); } /** @@ -146,10 +199,6 @@ class Connection { * @param {boolean} flush true if flush should happen after the message is written to the buffer. */ write(message, observer, flush) { - if (message.isInitializationMessage) { - observer = this._state.wrap(observer); - } - const queued = this._queueObserver(observer); if (queued) { @@ -170,51 +219,29 @@ class Connection { } } - /** - * Complete protocol initialization. - * @param {BaseBuffer} buffer the handshake response buffer. - * @param {ProtocolHandshaker} protocolHandshaker the handshaker utility. - * @private - */ - _initializeProtocol(buffer, protocolHandshaker) { - try { - // re-assign the protocol because version might be lower than we initially assumed - this._protocol = protocolHandshaker.createNegotiatedProtocol(buffer); - - // Ok, protocol running. Simply forward all messages to the dechunker - this._ch.onmessage = buf => this._dechunker.write(buf); - - if (buffer.hasRemaining()) { - this._dechunker.write(buffer.readSlice(buffer.remaining())); - } - } catch (e) { - this._handleFatalError(e); - } - } - /** * "Fatal" means the connection is dead. Only call this if something * happens that cannot be recovered from. This will lead to all subscribers * failing, and the connection getting ejected from the session pool. * - * @param err an error object, forwarded to all current and future subscribers + * @param error an error object, forwarded to all current and future subscribers * @protected */ - _handleFatalError( err ) { + _handleFatalError(error) { this._isBroken = true; - this._error = err; + this._error = this._errorHandler.handleAndTransformError(error, this.hostPort); if (this._log.isErrorEnabled()) { - this._log.error(`${this} experienced a fatal error ${JSON.stringify(err)}`); + this._log.error(`${this} experienced a fatal error ${JSON.stringify(this._error)}`); } - if( this._currentObserver && this._currentObserver.onError ) { - this._currentObserver.onError(err); + if (this._currentObserver && this._currentObserver.onError) { + this._currentObserver.onError(this._error); } - while( this._pendingObservers.length > 0 ) { + while (this._pendingObservers.length > 0) { let observer = this._pendingObservers.shift(); - if( observer && observer.onError ) { - observer.onError(err); + if (observer && observer.onError) { + observer.onError(this._error); } } } @@ -250,7 +277,8 @@ class Connection { this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`); } try { - this._currentFailure = newError(payload.message, payload.code); + const error = newError(payload.message, payload.code); + this._currentFailure = this._errorHandler.handleAndTransformError(error, this.hostPort); this._currentObserver.onError( this._currentFailure ); } finally { this._updateCurrentObserver(); @@ -337,15 +365,6 @@ class Connection { return true; } - /** - * Get promise resolved when connection initialization succeed or rejected when it fails. - * Connection is initialized using {@link BoltProtocol#initialize()} function. - * @return {Promise} the result of connection initialization. - */ - initializationCompleted() { - return this._state.initializationCompleted(); - } - /* * Pop next pending observer form the list of observers and make it current observer. * @protected @@ -378,20 +397,6 @@ class Connection { return this._protocol.packer().packable(value, (err) => this._handleFatalError(err)); } - /** - * @protected - */ - _markInitialized(metadata) { - const serverVersion = metadata ? metadata.server : null; - if (!this.server.version) { - this.server.version = serverVersion; - const version = ServerVersion.fromString(serverVersion); - if (version.compareTo(VERSION_3_2_0) < 0) { - this._protocol.packer().disableByteArrays(); - } - } - } - _handleProtocolError(message) { this._currentFailure = null; this._updateCurrentObserver(); @@ -401,110 +406,36 @@ class Connection { } } -class ConnectionState { +class InitializationObserver { - /** - * @constructor - * @param {Connection} connection the connection to track state for. - */ - constructor(connection) { + constructor(connection, onSuccess, onError) { this._connection = connection; - - this._initRequested = false; - this._initError = null; - - this._resolveInitPromise = null; - this._rejectInitPromise = null; - this._initPromise = new Promise((resolve, reject) => { - this._resolveInitPromise = resolve; - this._rejectInitPromise = reject; - }); + this._onSuccess = onSuccess; + this._onError = onError; } - /** - * Wrap the given observer to track connection's initialization state. Connection is closed by the server if - * processing of INIT message fails so returned observer will handle initialization failure as a fatal error. - * @param {StreamObserver} observer the observer used for INIT message. - * @return {StreamObserver} updated observer. - */ - wrap(observer) { - return { - onNext: record => { - if (observer && observer.onNext) { - observer.onNext(record); - } - }, - onError: error => { - this._processFailure(error); - - this._connection._updateCurrentObserver(); // make sure this same observer will not be called again - try { - if (observer && observer.onError) { - observer.onError(error); - } - } finally { - this._connection._handleFatalError(error); - } - }, - onCompleted: metaData => { - this._connection._markInitialized(metaData); - this._resolveInitPromise(this._connection); - - if (observer && observer.onCompleted) { - observer.onCompleted(metaData); - } - } - }; + onNext(record) { + this.onError(newError('Received RECORD when initializing ' + JSON.stringify(record))); } - /** - * Get promise resolved when connection initialization succeed or rejected when it fails. - * @return {Promise} the result of connection initialization. - */ - initializationCompleted() { - this._initRequested = true; - - if (this._initError) { - const error = this._initError; - this._initError = null; // to reject initPromise only once - this._rejectInitPromise(error); - } + onError(error) { + this._connection._updateCurrentObserver(); // make sure this exact observer will not be called again + this._connection._handleFatalError(error); // initialization errors are fatal - return this._initPromise; + this._onError(error); } - /** - * @private - */ - _processFailure(error) { - if (this._initRequested) { - // someone is waiting for initialization to complete, reject the promise - this._rejectInitPromise(error); - } else { - // no one is waiting for initialization, memorize the error but do not reject the promise - // to avoid unnecessary unhandled promise rejection warnings - this._initError = error; + onCompleted(metadata) { + // read server version from the response metadata + const serverVersion = metadata ? metadata.server : null; + if (!this._connection.server.version) { + this._connection.server.version = serverVersion; + const version = ServerVersion.fromString(serverVersion); + if (version.compareTo(VERSION_3_2_0) < 0) { + this._connection.protocol().packer().disableByteArrays(); + } } - } -} -/** - * Crete new connection to the provided address. - * @access private - * @param {string} hostPort - the Bolt endpoint to connect to - * @param {object} config - this driver configuration - * @param {string=null} connectionErrorCode - error code for errors raised on connection errors - * @param {Logger} log - configured logger - * @return {Connection} - New connection - */ -function connect(hostPort, config = {}, connectionErrorCode = null, log = Logger.noOp()) { - const Ch = config.channel || Channel; - const parsedAddress = urlUtil.parseDatabaseUrl(hostPort); - const channelConfig = new ChannelConfig(parsedAddress, config, connectionErrorCode); - return new Connection(new Ch(channelConfig), parsedAddress.hostAndPort, log, config.disableLosslessIntegers); + this._onSuccess(this._connection); + } } - -export { - connect, - Connection -}; diff --git a/src/v1/internal/http/http-driver.js b/src/v1/internal/http/http-driver.js index 08151c646..c324e6a5a 100644 --- a/src/v1/internal/http/http-driver.js +++ b/src/v1/internal/http/http-driver.js @@ -29,7 +29,7 @@ export default class HttpDriver extends Driver { } session() { - return new HttpSession(this._hostPort, this._token, this._config, this._sessionTracker); + return new HttpSession(this._hostPort, this._authToken, this._config, this._sessionTracker); } close() { diff --git a/src/v1/internal/pool.js b/src/v1/internal/pool.js index aece79eec..75b0281c2 100644 --- a/src/v1/internal/pool.js +++ b/src/v1/internal/pool.js @@ -22,8 +22,9 @@ import {newError} from '../error'; import Logger from './logger'; class Pool { + /** - * @param {function} create an allocation function that creates a new resource. It's given + * @param {function(function): Promise} create an allocation function that creates a promise with a new resource. It's given * a single argument, a function that will return the resource to * the pool if invoked, which is meant to be called on .dispose * or .close or whatever mechanism the resource uses to finalize. @@ -54,33 +55,46 @@ class Pool { * @return {object} resource that is ready to use. */ acquire(key) { - const resource = this._acquire(key); + return this._acquire(key).then(resource => { + if (resource) { + resourceAcquired(key, this._activeResourceCounts); + if (this._log.isDebugEnabled()) { + this._log.debug(`${resource} acquired from the pool`); + } + return resource; + } - if (resource) { - resourceAcquired(key, this._activeResourceCounts); - if (this._log.isDebugEnabled()) { - this._log.debug(`${resource} acquired from the pool`); + // We're out of resources and will try to acquire later on when an existing resource is released. + const allRequests = this._acquireRequests; + const requests = allRequests[key]; + if (!requests) { + allRequests[key] = []; } - return Promise.resolve(resource); - } - // We're out of resources and will try to acquire later on when an existing resource is released. - const allRequests = this._acquireRequests; - const requests = allRequests[key]; - if (!requests) { - allRequests[key] = []; - } + return new Promise((resolve, reject) => { + let request; + + const timeoutId = setTimeout(() => { + // acquisition timeout fired - return new Promise((resolve, reject) => { - let request; + // remove request from the queue of pending requests, if it's still there + // request might've been taken out by the release operation + const pendingRequests = allRequests[key]; + if (pendingRequests) { + allRequests[key] = pendingRequests.filter(item => item !== request); + } - const timeoutId = setTimeout(() => { - allRequests[key] = allRequests[key].filter(item => item !== request); - reject(newError(`Connection acquisition timed out in ${this._acquisitionTimeout} ms.`)); - }, this._acquisitionTimeout); + if (request.isCompleted()) { + // request already resolved/rejected by the release operation; nothing to do + } else { + // request is still pending and needs to be failed + request.reject(newError(`Connection acquisition timed out in ${this._acquisitionTimeout} ms.`)); + } + }, this._acquisitionTimeout); - request = new PendingRequest(resolve, timeoutId, this._log); - allRequests[key].push(request); + request = new PendingRequest(resolve, reject, timeoutId, this._log); + allRequests[key].push(request); + }); }); } @@ -133,14 +147,14 @@ class Pool { if (this._validate(resource)) { // idle resource is valid and can be acquired - return resource; + return Promise.resolve(resource); } else { this._destroy(resource); } } if (this._maxSize && this.activeResourceCount(key) >= this._maxSize) { - return null; + return Promise.resolve(null); } // there exist no idle valid resources, create a new one for acquisition @@ -172,19 +186,37 @@ class Pool { } resourceReleased(key, this._activeResourceCounts); - // check if there are any pending requests + this._processPendingAcquireRequests(key); + } + + _processPendingAcquireRequests(key) { const requests = this._acquireRequests[key]; if (requests) { - const pending = requests[0]; - - if (pending) { - const resource = this._acquire(key); - if (resource) { - // managed to acquire a valid resource from the pool to satisfy the pending acquire request - resourceAcquired(key, this._activeResourceCounts); // increment the active counter - requests.shift(); // forget the pending request - pending.resolve(resource); // resolve the pending request with the acquired resource - } + const pendingRequest = requests.shift(); // pop a pending acquire request + + if (pendingRequest) { + this._acquire(key) + .catch(error => { + // failed to acquire/create a new connection to resolve the pending acquire request + // propagate the error by failing the pending request + pendingRequest.reject(error); + return null; + }) + .then(resource => { + if (resource) { + // managed to acquire a valid resource from the pool + + if (pendingRequest.isCompleted()) { + // request has been completed, most likely failed by a timeout + // return the acquired resource back to the pool + this._release(key, resource); + } else { + // request is still pending and can be resolved with the newly acquired resource + resourceAcquired(key, this._activeResourceCounts); // increment the active counter + pendingRequest.resolve(resource); // resolve the pending request with the acquired resource + } + } + }); } else { delete this._acquireRequests[key]; } @@ -220,13 +252,24 @@ function resourceReleased(key, activeResourceCounts) { class PendingRequest { - constructor(resolve, timeoutId, log) { + constructor(resolve, reject, timeoutId, log) { this._resolve = resolve; + this._reject = reject; this._timeoutId = timeoutId; this._log = log; + this._completed = false; + } + + isCompleted() { + return this._completed; } resolve(resource) { + if (this._completed) { + return; + } + this._completed = true; + clearTimeout(this._timeoutId); if (this._log.isDebugEnabled()) { this._log.debug(`${resource} acquired from the pool`); @@ -234,6 +277,15 @@ class PendingRequest { this._resolve(resource); } + reject(error) { + if (this._completed) { + return; + } + this._completed = true; + + clearTimeout(this._timeoutId); + this._reject(error); + } } export default Pool diff --git a/src/v1/internal/protocol-handshaker.js b/src/v1/internal/protocol-handshaker.js index ca7ad2b40..276c9d2db 100644 --- a/src/v1/internal/protocol-handshaker.js +++ b/src/v1/internal/protocol-handshaker.js @@ -25,8 +25,6 @@ import BoltProtocolV2 from './bolt-protocol-v2'; const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP" const BOLT_MAGIC_PREAMBLE = 0x6060B017; -const LATEST_PROTOCOL_VERSION = 2; - export default class ProtocolHandshaker { /** @@ -45,14 +43,6 @@ export default class ProtocolHandshaker { this._log = log; } - /** - * Create the newest bolt protocol. - * @return {BoltProtocol} the protocol. - */ - createLatestProtocol() { - return this._createProtocolWithVersion(LATEST_PROTOCOL_VERSION); - } - /** * Write a Bolt handshake into the underlying network channel. */ diff --git a/src/v1/internal/request-message.js b/src/v1/internal/request-message.js index 7dd0b1662..a9255a7b2 100644 --- a/src/v1/internal/request-message.js +++ b/src/v1/internal/request-message.js @@ -27,10 +27,9 @@ const PULL_ALL = 0x3F; // 0011 1111 // PULL * export default class RequestMessage { - constructor(signature, fields, isInitializationMessage, toString) { + constructor(signature, fields, toString) { this.signature = signature; this.fields = fields; - this.isInitializationMessage = isInitializationMessage; this.toString = toString; } @@ -41,7 +40,7 @@ export default class RequestMessage { * @return {RequestMessage} new INIT message. */ static init(clientName, authToken) { - return new RequestMessage(INIT, [clientName, authToken], true, () => `INIT ${clientName} {...}`); + return new RequestMessage(INIT, [clientName, authToken], () => `INIT ${clientName} {...}`); } /** @@ -51,7 +50,7 @@ export default class RequestMessage { * @return {RequestMessage} new RUN message. */ static run(statement, parameters) { - return new RequestMessage(RUN, [statement, parameters], false, () => `RUN ${statement} ${JSON.stringify(parameters)}`); + return new RequestMessage(RUN, [statement, parameters], () => `RUN ${statement} ${JSON.stringify(parameters)}`); } /** @@ -72,5 +71,5 @@ export default class RequestMessage { } // constants for messages that never change -const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], false, () => 'PULL_ALL'); -const RESET_MESSAGE = new RequestMessage(RESET, [], false, () => 'RESET'); +const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL'); +const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET'); diff --git a/src/v1/internal/routing-util.js b/src/v1/internal/routing-util.js index 56f2d2e6e..0cb9061b3 100644 --- a/src/v1/internal/routing-util.js +++ b/src/v1/internal/routing-util.js @@ -24,7 +24,6 @@ import {ServerVersion, VERSION_3_2_0} from './server-version'; const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers'; const CALL_GET_ROUTING_TABLE = 'CALL dbms.cluster.routing.getRoutingTable($context)'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; -const UNAUTHORIZED_CODE = 'Neo.ClientError.Security.Unauthorized'; export default class RoutingUtil { @@ -49,9 +48,6 @@ export default class RoutingUtil { throw newError( `Server at ${routerAddress} can't perform routing. Make sure you are connecting to a causal cluster`, SERVICE_UNAVAILABLE); - } else if (error.code === UNAUTHORIZED_CODE) { - // auth error is a sign of a configuration issue, rediscovery should not proceed - throw error; } else { // return nothing when failed to connect because code higher in the callstack is still able to retry with a // different session towards a different router diff --git a/src/v1/internal/stream-observer.js b/src/v1/internal/stream-observer.js index f95e04141..398276d56 100644 --- a/src/v1/internal/stream-observer.js +++ b/src/v1/internal/stream-observer.js @@ -29,18 +29,14 @@ import Record from '../record'; * @access private */ class StreamObserver { - /** - * @constructor - * @param errorTransformer optional callback to be used for adding additional logic on error - */ - constructor(errorTransformer = (err) => {return err}) { + + constructor() { this._fieldKeys = null; this._fieldLookup = null; this._queuedRecords = []; this._tail = null; this._error = null; this._hasFailed = false; - this._errorTransformer = errorTransformer; this._observer = null; this._conn = null; this._meta = {}; @@ -110,21 +106,19 @@ class StreamObserver { * @param {Object} error - An error object */ onError(error) { - if(this._hasFailed) { + if (this._hasFailed) { return; } this._hasFailed = true; - const transformedError = this._errorTransformer(error, this._conn); - - if( this._observer ) { - if( this._observer.onError ) { - this._observer.onError( transformedError ); + if (this._observer) { + if (this._observer.onError) { + this._observer.onError(error); } else { - console.log( transformedError ); + console.log(error); } } else { - this._error = transformedError; + this._error = error; } } diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 159f05f4d..bf4c0ee56 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -17,12 +17,12 @@ * limitations under the License. */ -import Session from './session'; import {Driver} from './driver'; import {newError, SESSION_EXPIRED} from './error'; import {LoadBalancer} from './internal/connection-providers'; import LeastConnectedLoadBalancingStrategy, {LEAST_CONNECTED_STRATEGY_NAME} from './internal/least-connected-load-balancing-strategy'; import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './internal/round-robin-load-balancing-strategy'; +import ConnectionErrorHandler from './internal/connection-error-handler'; /** * A driver that supports routing in a causal cluster. @@ -44,33 +44,24 @@ class RoutingDriver extends Driver { return new LoadBalancer(hostPort, this._routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback, this._log); } - _createSession(mode, connectionProvider, bookmark, config) { - return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => { - if (!conn) { - // connection can be undefined if error happened before connection was acquired - return error; - } - - const hostPort = conn.hostPort; + _createConnectionErrorHandler() { + // connection errors mean SERVICE_UNAVAILABLE for direct driver but for routing driver they should only + // result in SESSION_EXPIRED because there might still exist other servers capable of serving the request + return new ConnectionErrorHandler(SESSION_EXPIRED, + (error, hostPort) => this._handleUnavailability(error, hostPort), + (error, hostPort) => this._handleWriteFailure(error, hostPort)); + } - if (error.code === SESSION_EXPIRED || isDatabaseUnavailable(error)) { - this._log.warn(`Routing driver ${this._id} will forget ${hostPort} because of an error ${error.code} '${error.message}'`); - this._connectionProvider.forget(hostPort); - return error; - } else if (isFailureToWrite(error)) { - this._log.warn(`Routing driver ${this._id} will forget writer ${hostPort} because of an error ${error.code} '${error.message}'`); - this._connectionProvider.forgetWriter(hostPort); - return newError('No longer possible to write to server at ' + hostPort, SESSION_EXPIRED); - } else { - return error; - } - }); + _handleUnavailability(error, hostPort) { + this._log.warn(`Routing driver ${this._id} will forget ${hostPort} because of an error ${error.code} '${error.message}'`); + this._connectionProvider.forget(hostPort); + return error; } - _connectionErrorCode() { - // connection errors mean SERVICE_UNAVAILABLE for direct driver but for routing driver they should only - // result in SESSION_EXPIRED because there might still exist other servers capable of serving the request - return SESSION_EXPIRED; + _handleWriteFailure(error, hostPort) { + this._log.warn(`Routing driver ${this._id} will forget writer ${hostPort} because of an error ${error.code} '${error.message}'`); + this._connectionProvider.forgetWriter(hostPort); + return newError('No longer possible to write to server at ' + hostPort, SESSION_EXPIRED); } /** @@ -102,33 +93,4 @@ function validateConfig(config) { return config; } -/** - * @private - */ -function isFailureToWrite(error) { - return error.code === 'Neo.ClientError.Cluster.NotALeader' || - error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'; -} - -/** - * @private - */ -function isDatabaseUnavailable(error) { - return error.code === 'Neo.TransientError.General.DatabaseUnavailable'; -} - -/** - * @private - */ -class RoutingSession extends Session { - constructor(mode, connectionProvider, bookmark, config, onFailedConnection) { - super(mode, connectionProvider, bookmark, config); - this._onFailedConnection = onFailedConnection; - } - - _onRunFailure() { - return this._onFailedConnection; - } -} - export default RoutingDriver diff --git a/src/v1/session.js b/src/v1/session.js index e04425efa..ae038585b 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -68,7 +68,7 @@ class Session { } _run(statement, parameters, statementRunner) { - const streamObserver = new StreamObserver(this._onRunFailure()); + const streamObserver = new StreamObserver(); const connectionHolder = this._connectionHolderWithMode(this._mode); if (!this._hasTx) { connectionHolder.initializeConnection(); @@ -110,10 +110,8 @@ class Session { connectionHolder.initializeConnection(); this._hasTx = true; - return new Transaction(connectionHolder, () => { - this._hasTx = false; - }, - this._onRunFailure(), this._lastBookmark, this._updateBookmark.bind(this)); + const onTxClose = () => this._hasTx = false; + return new Transaction(connectionHolder, onTxClose.bind(this), this._lastBookmark, this._updateBookmark.bind(this)); } /** @@ -196,11 +194,6 @@ class Session { } } - //Can be overridden to add error callback on RUN - _onRunFailure() { - return (err) => {return err}; - } - _connectionHolderWithMode(mode) { if (mode === READ) { return this._readConnectionHolder; diff --git a/src/v1/transaction.js b/src/v1/transaction.js index 9d4a2cdf8..d7a1efc5f 100644 --- a/src/v1/transaction.js +++ b/src/v1/transaction.js @@ -32,11 +32,10 @@ class Transaction { * @constructor * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. - * @param {function(error: Error): Error} errorTransformer callback use to transform error. * @param {Bookmark} bookmark bookmark for transaction begin. * @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced. */ - constructor(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) { + constructor(connectionHolder, onClose, bookmark, onBookmark) { this._connectionHolder = connectionHolder; const streamObserver = new _TransactionStreamObserver(this); @@ -46,7 +45,6 @@ class Transaction { this._state = _states.ACTIVE; this._onClose = onClose; - this._errorTransformer = errorTransformer; this._onBookmark = onBookmark; } @@ -117,7 +115,7 @@ class Transaction { /** Internal stream observer used for transactional results*/ class _TransactionStreamObserver extends StreamObserver { constructor(tx) { - super(tx._errorTransformer || ((err) => {return err})); + super(); this._tx = tx; } diff --git a/test/internal/bolt-protocol-v1.test.js b/test/internal/bolt-protocol-v1.test.js index 9ea272bcb..b0bb65304 100644 --- a/test/internal/bolt-protocol-v1.test.js +++ b/test/internal/bolt-protocol-v1.test.js @@ -150,5 +150,4 @@ describe('BoltProtocolV1', () => { function verifyMessage(expected, actual) { expect(actual.signature).toEqual(expected.signature); expect(actual.fields).toEqual(expected.fields); - expect(actual.isInitializationMessage).toEqual(expected.isInitializationMessage); } diff --git a/test/internal/connection-error-handler.test.js b/test/internal/connection-error-handler.test.js new file mode 100644 index 000000000..695708e34 --- /dev/null +++ b/test/internal/connection-error-handler.test.js @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ConnectionErrorHandler from '../../src/v1/internal/connection-error-handler'; +import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error'; + +describe('ConnectionErrorHandler', () => { + + it('should return error code', () => { + const code = 'Neo4j.Error.Hello'; + const handler = new ConnectionErrorHandler(code); + expect(code).toEqual(handler.errorCode()); + }); + + it('should handle and transform availability errors', () => { + const errors = []; + const hostPorts = []; + const transformedError = newError('Message', 'Code'); + const handler = new ConnectionErrorHandler(SERVICE_UNAVAILABLE, (error, hostPort) => { + errors.push(error); + hostPorts.push(hostPort); + return transformedError; + }); + + const error1 = newError('A', SERVICE_UNAVAILABLE); + const error2 = newError('B', SESSION_EXPIRED); + const error3 = newError('C', 'Neo.TransientError.General.DatabaseUnavailable'); + + [error1, error2, error3].forEach((error, idx) => { + const newTransformedError = handler.handleAndTransformError(error, 'localhost:' + idx); + expect(newTransformedError).toEqual(transformedError); + }); + + expect(errors).toEqual([error1, error2, error3]); + expect(hostPorts).toEqual(['localhost:0', 'localhost:1', 'localhost:2']); + }); + + it('should handle and transform failure to write errors', () => { + const errors = []; + const hostPorts = []; + const transformedError = newError('Message', 'Code'); + const handler = new ConnectionErrorHandler(SERVICE_UNAVAILABLE, null, (error, hostPort) => { + errors.push(error); + hostPorts.push(hostPort); + return transformedError; + }); + + const error1 = newError('A', 'Neo.ClientError.Cluster.NotALeader'); + const error2 = newError('B', 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'); + + [error1, error2].forEach((error, idx) => { + const newTransformedError = handler.handleAndTransformError(error, 'localhost:' + idx); + expect(newTransformedError).toEqual(transformedError); + }); + + expect(errors).toEqual([error1, error2]); + expect(hostPorts).toEqual(['localhost:0', 'localhost:1']); + }); + +}); diff --git a/test/internal/connection-holder.test.js b/test/internal/connection-holder.test.js index 8a269f62e..bbed8823c 100644 --- a/test/internal/connection-holder.test.js +++ b/test/internal/connection-holder.test.js @@ -65,7 +65,6 @@ describe('ConnectionHolder', () => { connectionHolder.getConnection(new StreamObserver()).then(conn => { expect(conn).toBe(connection); - verifyConnectionInitialized(conn); done(); }); }); @@ -79,24 +78,22 @@ describe('ConnectionHolder', () => { connectionHolder.initializeConnection(); connectionHolder.getConnection(streamObserver).then(conn => { - verifyConnectionInitialized(conn); verifyConnection(streamObserver, 'Neo4j/9.9.9'); done(); }); }); - it('should make stream observer aware about connection when initialization fails', done => { - const connection = new FakeConnection().withServerVersion('Neo4j/7.7.7').withFailedInitialization(new Error('Oh!')); - const connectionProvider = newSingleConnectionProvider(connection); + it('should propagate connection acquisition failure', done => { + const errorMessage = 'Failed to acquire or initialize the connection'; + const connectionPromise = Promise.reject(new Error(errorMessage)); + const connectionProvider = newSingleConnectionProvider(connectionPromise); const connectionHolder = new ConnectionHolder(READ, connectionProvider); const streamObserver = new StreamObserver(); connectionHolder.initializeConnection(); connectionHolder.getConnection(streamObserver).catch(error => { - expect(error.message).toEqual('Oh!'); - verifyConnectionInitialized(connection); - verifyConnection(streamObserver, 'Neo4j/7.7.7'); + expect(error.message).toEqual(errorMessage); done(); }); }); @@ -229,10 +226,6 @@ function newSingleConnectionProvider(connection) { return new SingleConnectionProvider(Promise.resolve(connection)); } -function verifyConnectionInitialized(connection) { - expect(connection.initializationInvoked).toEqual(1); -} - function verifyConnection(streamObserver, expectedServerVersion) { expect(streamObserver._conn).toBeDefined(); expect(streamObserver._conn).not.toBeNull(); diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 32d42eb67..58d99d132 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -1095,7 +1095,7 @@ function setupLoadBalancerToRememberRouters(loadBalancer, routersArray) { } function newPool() { - return new Pool(FakeConnection.create); + return new Pool((address, release) => Promise.resolve(new FakeConnection(address, release))); } function expectRoutingTable(loadBalancer, routers, readers, writers) { @@ -1122,14 +1122,6 @@ class FakeConnection { this.address = address; this.release = release; } - - static create(address, release) { - return new FakeConnection(address, release); - } - - initializationCompleted() { - return Promise.resolve(this); - } } class FakeRediscovery { diff --git a/test/internal/connection.test.js b/test/internal/connection.test.js new file mode 100644 index 000000000..7b63e7b28 --- /dev/null +++ b/test/internal/connection.test.js @@ -0,0 +1,421 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as DummyChannel from '../../src/v1/internal/ch-dummy'; +import Connection from '../../src/v1/internal/connection'; +import {Packer} from '../../src/v1/internal/packstream-v1'; +import {Chunker} from '../../src/v1/internal/chunking'; +import {alloc} from '../../src/v1/internal/buf'; +import {Neo4jError, newError, SERVICE_UNAVAILABLE} from '../../src/v1/error'; +import sharedNeo4j from '../internal/shared-neo4j'; +import {ServerVersion} from '../../src/v1/internal/server-version'; +import lolex from 'lolex'; +import Logger from '../../src/v1/internal/logger'; +import StreamObserver from '../../src/v1/internal/stream-observer'; +import RequestMessage from '../../src/v1/internal/request-message'; +import ConnectionErrorHandler from '../../src/v1/internal/connection-error-handler'; +import testUtils from '../internal/test-utils'; + +const ILLEGAL_MESSAGE = {signature: 42, fields: []}; +const SUCCESS_MESSAGE = {signature: 0x70, fields: [{}]}; +const FAILURE_MESSAGE = {signature: 0x7F, fields: [newError('Hello')]}; +const RECORD_MESSAGE = {signature: 0x71, fields: [{value: 'Hello'}]}; + +describe('Connection', () => { + + let clock; + let connection; + + afterEach(done => { + if (clock) { + clock.uninstall(); + clock = null; + } + + const usedConnection = connection; + connection = null; + if (usedConnection) { + usedConnection.close(); + } + done(); + }); + + it('should have correct creation timestamp', () => { + clock = lolex.install(); + clock.setSystemTime(424242); + + connection = createConnection('bolt://localhost'); + + expect(connection.creationTimestamp).toEqual(424242); + }); + + it('should read/write basic messages', done => { + connection = createConnection('bolt://localhost'); + + connection._negotiateProtocol().then(() => { + connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { + onCompleted: msg => { + expect(msg).not.toBeNull(); + done(); + }, + onError: console.log + }); + }); + }); + + it('should retrieve stream', done => { + connection = createConnection('bolt://localhost'); + + const records = []; + const pullAllObserver = { + onNext: record => { + records.push(record); + }, + onCompleted: () => { + expect(records[0][0]).toBe(1); + done(); + } + }; + + connection._negotiateProtocol().then(() => { + connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); + connection.write(RequestMessage.run('RETURN 1.0', {}), {}, false); + connection.write(RequestMessage.pullAll(), pullAllObserver, true); + }); + }); + + it('should write protocol handshake', () => { + const observer = DummyChannel.observer; + connection = createConnection('bolt://localhost', {channel: DummyChannel.channel}); + + connection._negotiateProtocol(); + + const boltMagicPreamble = '60 60 b0 17'; + const protocolVersion2 = '00 00 00 02'; + const protocolVersion1 = '00 00 00 01'; + const noProtocolVersion = '00 00 00 00'; + expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `); + }); + + it('should provide error message when connecting to http-port', done => { + connection = createConnection('bolt://localhost:7474', {encrypted: false}); + + connection.connect('mydriver/0.0.0', basicAuthToken()).catch(error => { + expect(error).toBeDefined(); + expect(error).not.toBeNull(); + + if (testUtils.isServer()) { + //only node gets the pretty error message + expect(error.message).toBe('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'); + } + + done(); + }); + }); + + it('should convert failure messages to errors', done => { + const channel = new DummyChannel.channel; + connection = new Connection(channel, new ConnectionErrorHandler(SERVICE_UNAVAILABLE), 'localhost:7687', Logger.noOp()); + + connection._negotiateProtocol(); + + const errorCode = 'Neo.ClientError.Schema.ConstraintValidationFailed'; + const errorMessage = 'Node 0 already exists with label User and property "email"=[john@doe.com]'; + + connection._queueObserver({ + onError: error => { + expectNeo4jError(error, errorCode, errorMessage); + done(); + } + }); + + channel.onmessage(packedHandshakeMessage()); + channel.onmessage(packedFailureMessage(errorCode, errorMessage)); + }); + + it('should notify when connection initialization completes', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(initializedConnection => { + expect(initializedConnection).toBe(connection); + done(); + }); + }); + + it('should notify when connection initialization fails', done => { + connection = createConnection('bolt://localhost:7474'); // wrong port + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(() => done.fail('Should not initialize')) + .catch(error => { + expect(error).toBeDefined(); + done(); + }); + }); + + it('should have server version after connection initialization completed', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(initializedConnection => { + expect(initializedConnection).toBe(connection); + const serverVersion = ServerVersion.fromString(connection.server.version); + expect(serverVersion).toBeDefined(); + done(); + }); + }); + + it('should fail all new observers after failure to connect', done => { + connection = createConnection('bolt://localhost:7474'); // wrong port + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(() => done.fail('Should not connect')) + .catch(initialError => { + expect(initialError).toBeDefined(); + expect(initialError).not.toBeNull(); + + expect(connection.isOpen()).toBeFalsy(); + + const streamObserver = new StreamObserver(); + streamObserver.subscribe({ + onError: error => { + expect(error).toEqual(initialError); + done(); + } + }); + connection._queueObserver(streamObserver); + }); + }); + + it('should respect connection timeout', done => { + testConnectionTimeout(false, done); + }); + + it('should respect encrypted connection timeout', done => { + testConnectionTimeout(true, done); + }); + + it('should not queue INIT observer when broken', done => { + testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().initialize('Hello', {}, {}), done); + }); + + it('should not queue RUN observer when broken', done => { + testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().run('RETURN 1', {}, {}), done); + }); + + it('should not queue RESET observer when broken', done => { + const resetAction = connection => connection.resetAndFlush().catch(ignore => { + }); + + testQueueingOfObserversWithBrokenConnection(resetAction, done); + }); + + it('should reset and flush when SUCCESS received', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection.resetAndFlush().then(() => { + expect(connection.isOpen()).toBeTruthy(); + done(); + }).catch(error => done.fail(error)); + + // write a SUCCESS message for RESET before the actual response is received + connection._handleMessage(SUCCESS_MESSAGE); + // enqueue a dummy observer to handle the real SUCCESS message + connection._queueObserver({ + onCompleted: () => { + } + }); + }); + }); + + it('should fail to reset and flush when FAILURE received', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection.resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual('Received FAILURE as a response for RESET: Neo4jError: Hello'); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + done(); + }); + + // write a FAILURE message for RESET before the actual response is received + connection._handleMessage(FAILURE_MESSAGE); + // enqueue a dummy observer to handle the real SUCCESS message + connection._queueObserver({ + onCompleted: () => { + } + }); + }); + }); + + it('should fail to reset and flush when RECORD received', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection.resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual('Received RECORD as a response for RESET: {"value":"Hello"}'); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + done(); + }); + + // write a RECORD message for RESET before the actual response is received + connection._handleMessage(RECORD_MESSAGE); + // enqueue a dummy observer to handle the real SUCCESS message + connection._queueObserver({ + onCompleted: () => { + } + }); + }); + }); + + it('should acknowledge failure with RESET when SUCCESS received', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection._currentFailure = newError('Hello'); + connection._resetOnFailure(); + + // write a SUCCESS message for RESET before the actual response is received + connection._handleMessage(SUCCESS_MESSAGE); + // enqueue a dummy observer to handle the real SUCCESS message + connection._queueObserver({ + onCompleted: () => { + } + }); + + expect(connection._currentFailure).toBeNull(); + done(); + }); + }); + + it('should handle and transform fatal errors', done => { + const errors = []; + const hostPorts = []; + const transformedError = newError('Message', 'Code'); + const errorHandler = new ConnectionErrorHandler(SERVICE_UNAVAILABLE, (error, hostPort) => { + errors.push(error); + hostPorts.push(hostPort); + return transformedError; + }); + + connection = Connection.create('bolt://localhost', {}, errorHandler, Logger.noOp()); + + connection._queueObserver({ + onError: error => { + expect(error).toEqual(transformedError); + expect(errors.length).toEqual(1); + expect(errors[0].code).toEqual(SERVICE_UNAVAILABLE); + expect(hostPorts).toEqual([connection.hostPort]); + done(); + } + }); + + connection._handleFatalError(newError('Hello', SERVICE_UNAVAILABLE)); + }); + + function packedHandshakeMessage() { + const result = alloc(4); + result.putInt32(0, 1); + result.reset(); + return result; + } + + function packedFailureMessage(code, message) { + const channel = new DummyChannel.channel; + const chunker = new Chunker(channel); + const packer = new Packer(chunker); + packer.packStruct(0x7F, [packer.packable({code: code, message: message})]); + chunker.messageBoundary(); + chunker.flush(); + const data = channel.toBuffer(); + const result = alloc(data.length); + result.putBytes(0, data); + return result; + } + + function expectNeo4jError(error, expectedCode, expectedMessage) { + expect(() => { + throw error; + }).toThrow(new Neo4jError(expectedMessage, expectedCode)); + expect(error.name).toBe('Neo4jError'); + } + + function basicAuthToken() { + return { + scheme: 'basic', + principal: sharedNeo4j.username, + credentials: sharedNeo4j.password + }; + } + + function testConnectionTimeout(encrypted, done) { + const boltUri = 'bolt://10.0.0.0'; // use non-routable IP address which never responds + connection = createConnection(boltUri, {encrypted: encrypted, connectionTimeout: 1000}, 'TestErrorCode'); + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(() => done.fail('Should not be able to connect')) + .catch(error => { + expect(error.code).toEqual('TestErrorCode'); + + // in some environments non-routable address results in immediate 'connection refused' error and connect + // timeout is not fired; skip message assertion for such cases, it is important for connect attempt to not hang + if (error.message.indexOf('Failed to establish connection') === 0) { + expect(error.message).toEqual('Failed to establish connection in 1000ms'); + } + + done(); + }); + } + + function testQueueingOfObserversWithBrokenConnection(connectionAction, done) { + connection = createConnection('bolt://localhost'); + + connection._negotiateProtocol().then(() => { + connection._handleMessage(ILLEGAL_MESSAGE); + expect(connection.isOpen()).toBeFalsy(); + + expect(connection._pendingObservers.length).toEqual(0); + connectionAction(connection); + expect(connection._pendingObservers.length).toEqual(0); + + done(); + }); + } + + /** + * @return {Connection} + */ + function createConnection(url, config, errorCode = null) { + return Connection.create(url, config || {}, new ConnectionErrorHandler(errorCode || SERVICE_UNAVAILABLE), Logger.noOp()); + } + +}); diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js deleted file mode 100644 index 1ad0c9c9e..000000000 --- a/test/internal/connector.test.js +++ /dev/null @@ -1,388 +0,0 @@ -/** - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import * as DummyChannel from '../../src/v1/internal/ch-dummy'; -import {connect, Connection} from '../../src/v1/internal/connector'; -import {Packer} from '../../src/v1/internal/packstream-v1'; -import {Chunker} from '../../src/v1/internal/chunking'; -import {alloc} from '../../src/v1/internal/buf'; -import {Neo4jError, newError} from '../../src/v1/error'; -import sharedNeo4j from '../internal/shared-neo4j'; -import {ServerVersion} from '../../src/v1/internal/server-version'; -import lolex from 'lolex'; -import Logger from '../../src/v1/internal/logger'; -import StreamObserver from '../../src/v1/internal/stream-observer'; -import RequestMessage from '../../src/v1/internal/request-message'; - -const ILLEGAL_MESSAGE = {signature: 42, fields: []}; -const SUCCESS_MESSAGE = {signature: 0x70, fields: [{}]}; -const FAILURE_MESSAGE = {signature: 0x7F, fields: [newError('Hello')]}; -const RECORD_MESSAGE = {signature: 0x71, fields: [{value: 'Hello'}]}; - -describe('connector', () => { - - let clock; - let connection; - - afterEach(done => { - if (clock) { - clock.uninstall(); - clock = null; - } - - const usedConnection = connection; - connection = null; - if (usedConnection) { - usedConnection.close(); - } - done(); - }); - - it('should have correct creation timestamp', () => { - clock = lolex.install(); - clock.setSystemTime(424242); - - connection = connect('bolt://localhost'); - - expect(connection.creationTimestamp).toEqual(424242); - }); - - it('should read/write basic messages', done => { - // Given - connection = connect("bolt://localhost"); - - // When - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onCompleted: msg => { - expect(msg).not.toBeNull(); - done(); - }, - onError: console.log - }); - }); - - it('should retrieve stream', done => { - // Given - connection = connect("bolt://localhost"); - - // When - const records = []; - const pullAllObserver = { - onNext: record => { - records.push(record); - }, - onCompleted: () => { - expect(records[0][0]).toBe(1); - done(); - } - }; - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - connection.write(RequestMessage.run('RETURN 1.0', {}), {}, false); - connection.write(RequestMessage.pullAll(), pullAllObserver, true); - }); - - it('should use DummyChannel to read what gets written', done => { - // Given - const observer = DummyChannel.observer; - connection = connect('bolt://localhost', {channel: DummyChannel.channel}); - - const boltMagicPreamble = '60 60 b0 17'; - const protocolVersion2 = '00 00 00 02'; - const protocolVersion1 = '00 00 00 01'; - const noProtocolVersion = '00 00 00 00'; - expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `); - - observer.instance.clear(); - - // When - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - connection.write(RequestMessage.run('RETURN 1', {}), {}, true); - expect(observer.instance.toHex()).toBe('00 44 b2 01 8e 6d 79 64 72 69 76 65 72 2f 30 2e 30 2e 30 a3 86 73 63 68 65 6d 65 85 62 61 73 69 63 89 70 72 69 6e 63 69 70 61 6c 85 6e 65 6f 34 6a 8b 63 72 65 64 65 6e 74 69 61 6c 73 88 70 61 73 73 77 6f 72 64 00 00 00 0c b2 10 88 52 45 54 55 52 4e 20 31 a0 00 00 '); - done(); - }); - - it('should provide error message when connecting to http-port', done => { - // Given - connection = connect("bolt://localhost:7474", {encrypted: false}); - - // When - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onCompleted: msg => { - }, - onError: err => { - //only node gets the pretty error message - if (require('../../lib/v1/internal/ch-node.js').available) { - expect(err.message).toBe("Server responded HTTP. Make sure you are not trying to connect to the http endpoint " + - "(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)"); - } - done(); - } - }); - }); - - it('should convert failure messages to errors', done => { - const channel = new DummyChannel.channel; - connection = new Connection(channel, 'bolt://localhost', Logger.noOp()); - - const errorCode = 'Neo.ClientError.Schema.ConstraintValidationFailed'; - const errorMessage = 'Node 0 already exists with label User and property "email"=[john@doe.com]'; - - connection._queueObserver({ - onError: error => { - expectNeo4jError(error, errorCode, errorMessage); - done(); - } - }); - - channel.onmessage(packedHandshakeMessage()); - channel.onmessage(packedFailureMessage(errorCode, errorMessage)); - }); - - it('should notify when connection initialization completes', done => { - connection = connect('bolt://localhost'); - - connection.initializationCompleted().then(initializedConnection => { - expect(initializedConnection).toBe(connection); - done(); - }); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - }); - - it('should notify when connection initialization fails', done => { - connection = connect('bolt://localhost:7474'); // wrong port - - connection.initializationCompleted().catch(error => { - expect(error).toBeDefined(); - done(); - }); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - }); - - it('should notify provided observer when connection initialization completes', done => { - connection = connect('bolt://localhost'); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onCompleted: metaData => { - expect(connection.isOpen()).toBeTruthy(); - expect(metaData).toBeDefined(); - done(); - }, - }); - }); - - it('should notify provided observer when connection initialization fails', done => { - connection = connect('bolt://localhost:7474'); // wrong port - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onError: error => { - expect(connection.isOpen()).toBeFalsy(); - expect(error).toBeDefined(); - done(); - }, - }); - }); - - it('should have server version after connection initialization completed', done => { - connection = connect('bolt://localhost'); - - connection.initializationCompleted().then(initializedConnection => { - const serverVersion = ServerVersion.fromString(initializedConnection.server.version); - expect(serverVersion).toBeDefined(); - done(); - }); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - }); - - it('should fail all new observers after initialization error', done => { - connection = connect('bolt://localhost:7474'); // wrong port - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onError: initialError => { - expect(initialError).toBeDefined(); - - const streamObserver = new StreamObserver(); - streamObserver.subscribe({ - onError: error1 => { - expect(error1).toEqual(initialError); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onError: error2 => { - expect(error2).toEqual(initialError); - - done(); - } - }); - } - }); - - connection.protocol().run('RETURN 1', {}, streamObserver); - }, - }); - }); - - it('should respect connection timeout', done => { - testConnectionTimeout(false, done); - }); - - it('should respect encrypted connection timeout', done => { - testConnectionTimeout(true, done); - }); - - it('should not queue INIT observer when broken', () => { - testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().initialize('Hello', {}, {})); - }); - - it('should not queue RUN observer when broken', () => { - testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().run('RETURN 1', {}, {})); - }); - - it('should not queue RESET observer when broken', () => { - const resetAction = connection => connection.resetAndFlush().catch(ignore => { - }); - - testQueueingOfObserversWithBrokenConnection(resetAction); - }); - - it('should reset and flush when SUCCESS received', done => { - connection = connect('bolt://localhost'); - - connection.resetAndFlush().then(() => { - expect(connection.isOpen()).toBeTruthy(); - done(); - }).catch(error => done.fail(error)); - - connection._handleMessage(SUCCESS_MESSAGE); - }); - - it('should fail to reset and flush when FAILURE received', done => { - connection = connect('bolt://localhost'); - - connection.resetAndFlush() - .then(() => done.fail('Should fail')) - .catch(error => { - expect(error.message).toEqual('Received FAILURE as a response for RESET: Neo4jError: Hello'); - expect(connection._isBroken).toBeTruthy(); - expect(connection.isOpen()).toBeFalsy(); - done(); - }); - - connection._handleMessage(FAILURE_MESSAGE); - }); - - it('should fail to reset and flush when RECORD received', done => { - connection = connect('bolt://localhost'); - - connection.resetAndFlush() - .then(() => done.fail('Should fail')) - .catch(error => { - expect(error.message).toEqual('Received RECORD as a response for RESET: {"value":"Hello"}'); - expect(connection._isBroken).toBeTruthy(); - expect(connection.isOpen()).toBeFalsy(); - done(); - }); - - connection._handleMessage(RECORD_MESSAGE); - }); - - it('should acknowledge failure with RESET when SUCCESS received', () => { - connection = connect('bolt://localhost'); - - connection._currentFailure = newError('Hello'); - connection._resetOnFailure(); - - connection._handleMessage(SUCCESS_MESSAGE); - expect(connection._currentFailure).toBeNull(); - }); - - function packedHandshakeMessage() { - const result = alloc(4); - result.putInt32(0, 1); - result.reset(); - return result; - } - - function packedFailureMessage(code, message) { - const channel = new DummyChannel.channel; - const chunker = new Chunker(channel); - const packer = new Packer(chunker); - packer.packStruct(0x7F, [packer.packable({code: code, message: message})]); - chunker.messageBoundary(); - chunker.flush(); - const data = channel.toBuffer(); - const result = alloc(data.length); - result.putBytes(0, data); - return result; - } - - function expectNeo4jError(error, expectedCode, expectedMessage) { - expect(() => { - throw error; - }).toThrow(new Neo4jError(expectedMessage, expectedCode)); - expect(error.name).toBe("Neo4jError"); - } - - function basicAuthToken() { - return { - scheme: 'basic', - principal: sharedNeo4j.username, - credentials: sharedNeo4j.password - }; - } - - function testConnectionTimeout(encrypted, done) { - const boltUri = 'bolt://10.0.0.0'; // use non-routable IP address which never responds - connection = connect(boltUri, {encrypted: encrypted, connectionTimeout: 1000}, 'TestErrorCode'); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onNext: record => { - done.fail('Should not receive records: ' + record); - }, - onCompleted: () => { - done.fail('Should not be able to INIT'); - }, - onError: error => { - expect(error.code).toEqual('TestErrorCode'); - - // in some environments non-routable address results in immediate 'connection refused' error and connect - // timeout is not fired; skip message assertion for such cases, it is important for connect attempt to not hang - if (error.message.indexOf('Failed to establish connection') === 0) { - expect(error.message).toEqual('Failed to establish connection in 1000ms'); - } - - done(); - } - }); - } - - function testQueueingOfObserversWithBrokenConnection(connectionAction) { - connection = connect('bolt://localhost'); - - connection._handleMessage(ILLEGAL_MESSAGE); - expect(connection.isOpen()).toBeFalsy(); - - expect(connection._pendingObservers.length).toEqual(0); - connectionAction(connection); - expect(connection._pendingObservers.length).toEqual(0); - } - -}); diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 687b37055..f645ef7e8 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -32,12 +32,9 @@ export default class FakeConnection { this.resetInvoked = 0; this.releaseInvoked = 0; - this.initializationInvoked = 0; this.seenStatements = []; this.seenParameters = []; this.server = {}; - - this._initializationPromise = Promise.resolve(this); } protocol() { @@ -59,11 +56,6 @@ export default class FakeConnection { this.releaseInvoked++; } - initializationCompleted() { - this.initializationInvoked++; - return this._initializationPromise; - } - isOpen() { return this._open; } @@ -85,11 +77,6 @@ export default class FakeConnection { return this; } - withFailedInitialization(error) { - this._initializationPromise = Promise.reject(error); - return this; - } - withCreationTimestamp(value) { this.creationTimestamp = value; return this; diff --git a/test/internal/pool.test.js b/test/internal/pool.test.js index 09256ab0e..258d763b6 100644 --- a/test/internal/pool.test.js +++ b/test/internal/pool.test.js @@ -26,7 +26,7 @@ describe('Pool', () => { // Given let counter = 0; const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, counter++, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release))); // When const p0 = pool.acquire(key); @@ -49,7 +49,7 @@ describe('Pool', () => { // Given a pool that allocates let counter = 0; const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, counter++, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release))); // When const p0 = pool.acquire(key).then(r0 => { @@ -76,7 +76,7 @@ describe('Pool', () => { let counter = 0; const key1 = 'bolt://localhost:7687'; const key2 = 'bolt://localhost:7688'; - const pool = new Pool((url, release) => new Resource(url, counter++, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release))); // When const p0 = pool.acquire(key1); @@ -110,7 +110,7 @@ describe('Pool', () => { let destroyed = []; const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => { destroyed.push(resource); }, @@ -144,7 +144,7 @@ describe('Pool', () => { let counter = 0; const key1 = 'bolt://localhost:7687'; const key2 = 'bolt://localhost:7688'; - const pool = new Pool((url, release) => new Resource(url, counter++, release), + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release)), res => { res.destroyed = true; return true; @@ -189,7 +189,7 @@ describe('Pool', () => { it('destroys resource when key was purged', (done) => { let counter = 0; const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, counter++, release), + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release)), res => { res.destroyed = true; return true; @@ -220,7 +220,7 @@ describe('Pool', () => { const key2 = 'bolt://localhost:7688'; const key3 = 'bolt://localhost:7689'; - const pool = new Pool((url, release) => new Resource(url, counter++, release), + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release)), res => { res.destroyed = true; return true; @@ -251,7 +251,7 @@ describe('Pool', () => { let validated = false; let counter = 0; const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, counter++, release), + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release)), res => { res.destroyed = true; return true; @@ -286,7 +286,7 @@ describe('Pool', () => { const existingKey = 'bolt://localhost:7687'; const absentKey = 'bolt://localhost:7688'; - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); const p0 = pool.acquire(existingKey); const p1 = pool.acquire(existingKey); @@ -300,7 +300,7 @@ describe('Pool', () => { }); it('reports zero active resources when empty', () => { - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); expect(pool.activeResourceCount('bolt://localhost:1')).toEqual(0); expect(pool.activeResourceCount('bolt://localhost:2')).toEqual(0); @@ -309,7 +309,7 @@ describe('Pool', () => { it('reports active resources', (done) => { const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); const p0 = pool.acquire(key); const p1 = pool.acquire(key); @@ -326,7 +326,7 @@ describe('Pool', () => { it('reports active resources when they are acquired', (done) => { const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); // three new resources are created and returned to the pool const p0 = pool.acquire(key); @@ -359,7 +359,7 @@ describe('Pool', () => { it('does not report resources that are returned to the pool', (done) => { const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); const p0 = pool.acquire(key); const p1 = pool.acquire(key); @@ -398,7 +398,7 @@ describe('Pool', () => { const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => {}, resource => true, new PoolConfig(2, 5000) @@ -431,7 +431,7 @@ describe('Pool', () => { const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => {}, resource => true, new PoolConfig(2, 1000) @@ -459,7 +459,7 @@ describe('Pool', () => { const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => {}, resource => true ); @@ -487,7 +487,7 @@ describe('Pool', () => { const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => { }, () => true, @@ -529,7 +529,7 @@ describe('Pool', () => { let counter = 0; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => { }, resourceValidOnlyOnceValidationFunction, @@ -563,7 +563,7 @@ describe('Pool', () => { let counter = 0; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => { }, resourceValidOnlyOnceValidationFunction, diff --git a/test/internal/protocol-handshaker.test.js b/test/internal/protocol-handshaker.test.js index 1b250cdec..eb238d856 100644 --- a/test/internal/protocol-handshaker.test.js +++ b/test/internal/protocol-handshaker.test.js @@ -24,16 +24,6 @@ import {alloc} from '../../src/v1/internal/buf'; describe('ProtocolHandshaker', () => { - it('should create latest protocol', () => { - const handshaker = new ProtocolHandshaker(null, null, null, false, Logger.noOp()); - - const protocol = handshaker.createLatestProtocol(); - - expect(protocol).toBeDefined(); - expect(protocol).not.toBeNull(); - expect(protocol instanceof BoltProtocol).toBeTruthy(); - }); - it('should write handshake request', () => { const writtenBuffers = []; const fakeChannel = { diff --git a/test/internal/request-message.test.js b/test/internal/request-message.test.js index 08eac5342..a21cce668 100644 --- a/test/internal/request-message.test.js +++ b/test/internal/request-message.test.js @@ -29,7 +29,6 @@ describe('RequestMessage', () => { expect(message.signature).toEqual(0x01); expect(message.fields).toEqual([clientName, authToken]); - expect(message.isInitializationMessage).toBeTruthy(); expect(message.toString()).toEqual(`INIT ${clientName} {...}`); }); @@ -41,7 +40,6 @@ describe('RequestMessage', () => { expect(message.signature).toEqual(0x10); expect(message.fields).toEqual([statement, parameters]); - expect(message.isInitializationMessage).toBeFalsy(); expect(message.toString()).toEqual(`RUN ${statement} ${JSON.stringify(parameters)}`); }); @@ -50,7 +48,6 @@ describe('RequestMessage', () => { expect(message.signature).toEqual(0x3F); expect(message.fields).toEqual([]); - expect(message.isInitializationMessage).toBeFalsy(); expect(message.toString()).toEqual('PULL_ALL'); }); @@ -59,7 +56,6 @@ describe('RequestMessage', () => { expect(message.signature).toEqual(0x0F); expect(message.fields).toEqual([]); - expect(message.isInitializationMessage).toBeFalsy(); expect(message.toString()).toEqual('RESET'); }); }); diff --git a/test/internal/shared-neo4j.js b/test/internal/shared-neo4j.js index eeb4dcd6b..22d11922a 100644 --- a/test/internal/shared-neo4j.js +++ b/test/internal/shared-neo4j.js @@ -112,7 +112,7 @@ const additionalConfig = { }; const neoCtrlVersionParam = '-e'; -const defaultNeo4jVersion = '3.4.5'; +const defaultNeo4jVersion = '3.4.6'; const defaultNeoCtrlArgs = `${neoCtrlVersionParam} ${defaultNeo4jVersion}`; function neo4jCertPath(dir) { diff --git a/test/internal/stream-observer.test.js b/test/internal/stream-observer.test.js index 1b9cca79e..994c733c8 100644 --- a/test/internal/stream-observer.test.js +++ b/test/internal/stream-observer.test.js @@ -142,9 +142,12 @@ describe('StreamObserver', () => { expect(receivedMetaData).toEqual({metaDataField1: 'value1', metaDataField2: 'value2'}); }); - it('invokes error transformer only once on error', () => { + it('invokes subscribed observer only once of error', () => { const errors = []; - const streamObserver = new StreamObserver(error => errors.push(error)); + const streamObserver = new StreamObserver(); + streamObserver.subscribe({ + onError: error => errors.push(error) + }); const error1 = new Error('Hello'); const error2 = new Error('World'); diff --git a/test/v1/session.test.js b/test/v1/session.test.js index fb2f24662..f5592efba 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -971,23 +971,24 @@ describe('session', () => { }); it('should acquire connection for transaction', done => { - expect(session.beginTransaction()).toBeDefined(); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(0); - const otherSession1 = driver.session(); - expect(otherSession1.beginTransaction()).toBeDefined(); + session.beginTransaction().run('RETURN 1.0').then(result => { + expect(result.records[0].get(0)).toEqual(1); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(1); - const otherSession2 = driver.session(); - expect(otherSession2.beginTransaction()).toBeDefined(); + driver.session().beginTransaction().run('RETURN 2.0').then(result => { + expect(result.records[0].get(0)).toEqual(2); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(2); - const otherSession3 = driver.session(); - expect(otherSession3.beginTransaction()).toBeDefined(); + driver.session().beginTransaction().run('RETURN 3.0').then(result => { + expect(result.records[0].get(0)).toEqual(3); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(3); - expect(numberOfAcquiredConnectionsFromPool()).toEqual(4); + driver.session().beginTransaction().run('RETURN 4.0').then(result => { + expect(result.records[0].get(0)).toEqual(4); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(4); - session.close(() => { - otherSession1.close(() => { - otherSession2.close(() => { - otherSession3.close(() => { done(); }); });