Skip to content

Fixes around initial discovery #453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 30, 2019
22 changes: 12 additions & 10 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ class Driver {
/**
* You should not be calling this directly, instead use {@link driver}.
* @constructor
* @param {string} hostPort
* @param {ServerAddress} address
* @param {string} userAgent
* @param {object} authToken
* @param {object} config
* @protected
*/
constructor(hostPort, userAgent, authToken = {}, config = {}) {
constructor(address, userAgent, authToken = {}, config = {}) {
sanitizeConfig(config);

this._id = idGenerator++;
this._hostPort = hostPort;
this._address = address;
this._userAgent = userAgent;
this._openConnections = {};
this._authToken = authToken;
Expand Down Expand Up @@ -102,7 +102,7 @@ class Driver {
* @protected
*/
_afterConstruction() {
this._log.info(`Direct driver ${this._id} created for server address ${this._hostPort}`);
this._log.info(`Direct driver ${this._id} created for server address ${this._address}`);
}

/**
Expand Down Expand Up @@ -133,9 +133,9 @@ class Driver {
* @return {Promise<Connection>} promise resolved with a new connection or rejected when failed to connect.
* @access private
*/
_createConnection(hostPort, release) {
const connection = Connection.create(hostPort, this._config, this._createConnectionErrorHandler(), this._log);
connection._release = () => release(hostPort, connection);
_createConnection(address, release) {
const connection = Connection.create(address, this._config, this._createConnectionErrorHandler(), this._log);
connection._release = () => release(address, connection);
this._openConnections[connection.id] = connection;

return connection.connect(this._userAgent, this._authToken)
Expand All @@ -144,6 +144,8 @@ class Driver {
// notify Driver.onError callback about connection initialization errors
this.onError(error);
}
// let's destroy this connection
this._destroyConnection(connection);
// propagate the error because connection failed to connect / initialize
throw error;
});
Expand Down Expand Up @@ -206,8 +208,8 @@ class Driver {
}

// Extension point
_createConnectionProvider(hostPort, connectionPool, driverOnErrorCallback) {
return new DirectConnectionProvider(hostPort, connectionPool, driverOnErrorCallback);
_createConnectionProvider(address, connectionPool, driverOnErrorCallback) {
return new DirectConnectionProvider(address, connectionPool, driverOnErrorCallback);
}

// Extension point
Expand All @@ -218,7 +220,7 @@ class Driver {
_getOrCreateConnectionProvider() {
if (!this._connectionProvider) {
const driverOnErrorCallback = this._driverOnErrorCallback.bind(this);
this._connectionProvider = this._createConnectionProvider(this._hostPort, this._pool, driverOnErrorCallback);
this._connectionProvider = this._createConnectionProvider(this._address, this._pool, driverOnErrorCallback);
}
return this._connectionProvider;
}
Expand Down
5 changes: 3 additions & 2 deletions src/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import urlUtil from './internal/url-util';
import HttpDriver from './internal/http/http-driver';
import {isPoint, Point} from './spatial-types';
import {Date, DateTime, Duration, isDate, isDateTime, isDuration, isLocalDateTime, isLocalTime, isTime, LocalDateTime, LocalTime, Time} from './temporal-types';
import ServerAddress from './internal/server-address';

/**
* @property {function(username: string, password: string, realm: ?string)} basic the function to create a
Expand Down Expand Up @@ -227,12 +228,12 @@ function driver(url, authToken, config = {}) {
assertString(url, 'Bolt URL');
const parsedUrl = urlUtil.parseDatabaseUrl(url);
if (parsedUrl.scheme === 'bolt+routing') {
return new RoutingDriver(parsedUrl.hostAndPort, parsedUrl.query, USER_AGENT, authToken, config);
return new RoutingDriver(ServerAddress.fromUrl(parsedUrl.hostAndPort), parsedUrl.query, USER_AGENT, authToken, config);
} else if (parsedUrl.scheme === 'bolt') {
if (!isEmptyObjectOrNull(parsedUrl.query)) {
throw new Error(`Parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
}
return new Driver(parsedUrl.hostAndPort, USER_AGENT, authToken, config);
return new Driver(ServerAddress.fromUrl(parsedUrl.hostAndPort), USER_AGENT, authToken, config);
} else if (parsedUrl.scheme === 'http' || parsedUrl.scheme === 'https') {
return new HttpDriver(parsedUrl, USER_AGENT, authToken, config);
} else {
Expand Down
23 changes: 11 additions & 12 deletions src/v1/internal/browser/browser-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export default class WebSocketChannel {
return;
}

this._ws = createWebSocket(scheme, config.url);
this._ws = createWebSocket(scheme, config.address);
this._ws.binaryType = "arraybuffer";

let self = this;
Expand Down Expand Up @@ -169,13 +169,13 @@ export default class WebSocketChannel {
}
}

function createWebSocket(scheme, parsedUrl) {
const url = scheme + '://' + parsedUrl.hostAndPort;
function createWebSocket(scheme, address) {
const url = scheme + '://' + address.asHostPort();

try {
return new WebSocket(url);
} catch (error) {
if (isIPv6AddressIssueOnWindows(error, parsedUrl)) {
if (isIPv6AddressIssueOnWindows(error, address)) {

// WebSocket in IE and Edge browsers on Windows do not support regular IPv6 address syntax because they contain ':'.
// It's an invalid character for UNC (https://en.wikipedia.org/wiki/IPv6_address#Literal_IPv6_addresses_in_UNC_path_names)
Expand All @@ -190,34 +190,33 @@ function createWebSocket(scheme, parsedUrl) {
// Creation of WebSocket with unconverted address results in SyntaxError without message or stacktrace.
// That is why here we "catch" SyntaxError and rewrite IPv6 address if needed.

const windowsFriendlyUrl = asWindowsFriendlyIPv6Address(scheme, parsedUrl);
const windowsFriendlyUrl = asWindowsFriendlyIPv6Address(scheme, address);
return new WebSocket(windowsFriendlyUrl);
} else {
throw error;
}
}
}

function isIPv6AddressIssueOnWindows(error, parsedUrl) {
return error.name === 'SyntaxError' && isIPv6Address(parsedUrl);
function isIPv6AddressIssueOnWindows(error, address) {
return error.name === 'SyntaxError' && isIPv6Address(address.asHostPort());
}

function isIPv6Address(parsedUrl) {
const hostAndPort = parsedUrl.hostAndPort;
function isIPv6Address(hostAndPort) {
return hostAndPort.charAt(0) === '[' && hostAndPort.indexOf(']') !== -1;
}

function asWindowsFriendlyIPv6Address(scheme, parsedUrl) {
function asWindowsFriendlyIPv6Address(scheme, address) {
// replace all ':' with '-'
const hostWithoutColons = parsedUrl.host.replace(new RegExp(':', 'g'), '-');
const hostWithoutColons = address.host().replace(new RegExp(':', 'g'), '-');

// replace '%' with 's' for link-local IPv6 address like 'fe80::1%lo0'
const hostWithoutPercent = hostWithoutColons.replace('%', 's');

// append magic '.ipv6-literal.net' suffix
const ipv6Host = hostWithoutPercent + '.ipv6-literal.net';

return `${scheme}://${ipv6Host}:${parsedUrl.port}`;
return `${scheme}://${ipv6Host}:${address.port()}`;
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/v1/internal/channel-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ export default class ChannelConfig {

/**
* @constructor
* @param {Url} url the URL for the channel to connect to.
* @param {ServerAddress} address the address for the channel to connect to.
* @param {object} driverConfig the driver config provided by the user when driver is created.
* @param {string} connectionErrorCode the default error code to use on connection errors.
*/
constructor(url, driverConfig, connectionErrorCode) {
this.url = url;
constructor(address, driverConfig, connectionErrorCode) {
this.address = address;
this.encrypted = extractEncrypted(driverConfig);
this.trust = extractTrust(driverConfig);
this.trustedCertificates = extractTrustedCertificates(driverConfig);
Expand Down
8 changes: 4 additions & 4 deletions src/v1/internal/connection-error-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ export default class ConnectionErrorHandler {
/**
* Handle and transform the error.
* @param {Neo4jError} error the original error.
* @param {string} hostPort the host and port of the connection where the error happened.
* @param {ServerAddress} address the address of the connection where the error happened.
* @return {Neo4jError} new error that should be propagated to the user.
*/
handleAndTransformError(error, hostPort) {
handleAndTransformError(error, address) {
if (isAvailabilityError(error)) {
return this._handleUnavailability(error, hostPort);
return this._handleUnavailability(error, address);
}
if (isFailureToWrite(error)) {
return this._handleWriteFailure(error, hostPort);
return this._handleWriteFailure(error, address);
}
return error;
}
Expand Down
28 changes: 21 additions & 7 deletions src/v1/internal/connection-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import Session from '../session';
import RoutingTable from './routing-table';
import Rediscovery from './rediscovery';
import RoutingUtil from './routing-util';
import { HostNameResolver } from './node';

const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized';

Expand All @@ -45,32 +46,33 @@ class ConnectionProvider {

export class DirectConnectionProvider extends ConnectionProvider {

constructor(hostPort, connectionPool, driverOnErrorCallback) {
constructor(address, connectionPool, driverOnErrorCallback) {
super();
this._hostPort = hostPort;
this._address = address;
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
}

acquireConnection(mode) {
const connectionPromise = this._connectionPool.acquire(this._hostPort);
const connectionPromise = this._connectionPool.acquire(this._address);
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
}
}

export class LoadBalancer extends ConnectionProvider {

constructor(hostPort, routingContext, connectionPool, loadBalancingStrategy, hostNameResolver, driverOnErrorCallback, log) {
constructor(address, routingContext, connectionPool, loadBalancingStrategy, hostNameResolver, driverOnErrorCallback, log) {
super();
this._seedRouter = hostPort;
this._seedRouter = address;
this._routingTable = new RoutingTable([this._seedRouter]);
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
this._loadBalancingStrategy = loadBalancingStrategy;
this._hostNameResolver = hostNameResolver;
this._dnsResolver = new HostNameResolver();
this._log = log;
this._useSeedRouter = false;
this._useSeedRouter = true;
}

acquireConnection(accessMode) {
Expand Down Expand Up @@ -173,14 +175,26 @@ export class LoadBalancer extends ConnectionProvider {
}

_fetchRoutingTableUsingSeedRouter(seenRouters, seedRouter) {
const resolvedAddresses = this._hostNameResolver.resolve(seedRouter);
const resolvedAddresses = this._resolveSeedRouter(seedRouter);
return resolvedAddresses.then(resolvedRouterAddresses => {
// filter out all addresses that we've already tried
const newAddresses = resolvedRouterAddresses.filter(address => seenRouters.indexOf(address) < 0);
return this._fetchRoutingTable(newAddresses, null);
});
}

_resolveSeedRouter(seedRouter) {
const customResolution = this._hostNameResolver.resolve(seedRouter);
const dnsResolutions = customResolution.then(resolvedAddresses => {
return Promise.all(resolvedAddresses.map(address => {
return this._dnsResolver.resolve(address);
}));
});
return dnsResolutions.then(results => {
return [].concat.apply([], results);
});
}

_fetchRoutingTable(routerAddresses, routingTable) {
return routerAddresses.reduce((refreshedTablePromise, currentRouter, currentIndex) => {
return refreshedTablePromise.then(newRoutingTable => {
Expand Down
23 changes: 11 additions & 12 deletions src/v1/internal/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ export default class Connection {
* @constructor
* @param {Channel} channel - channel with a 'write' function and a 'onmessage' callback property.
* @param {ConnectionErrorHandler} errorHandler the error handler.
* @param {string} hostPort - the hostname and port to connect to.
* @param {ServerAddress} address - the server address to connect to.
* @param {Logger} log - the configured logger.
* @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers.
*/
constructor(channel, errorHandler, hostPort, log, disableLosslessIntegers = false) {
constructor(channel, errorHandler, address, log, disableLosslessIntegers = false) {
this.id = idGenerator++;
this.hostPort = hostPort;
this.server = {address: hostPort};
this.address = address;
this.server = { address: address.asHostPort() };
this.creationTimestamp = Date.now();
this._errorHandler = errorHandler;
this._disableLosslessIntegers = disableLosslessIntegers;
Expand All @@ -81,22 +81,21 @@ export default class Connection {
this._isBroken = false;

if (this._log.isDebugEnabled()) {
this._log.debug(`${this} created towards ${hostPort}`);
this._log.debug(`${this} created towards ${address}`);
}
}

/**
* Crete new connection to the provided address. Returned connection is not connected.
* @param {string} url - the Bolt endpoint to connect to.
* @param {ServerAddress} address - the Bolt endpoint to connect to.
* @param {object} config - this driver configuration.
* @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors.
* @param {Logger} log - configured logger.
* @return {Connection} - new connection.
*/
static create(url, config, errorHandler, log) {
const parsedAddress = urlUtil.parseDatabaseUrl(url);
const channelConfig = new ChannelConfig(parsedAddress, config, errorHandler.errorCode());
return new Connection(new Channel(channelConfig), errorHandler, parsedAddress.hostAndPort, log, config.disableLosslessIntegers);
static create(address, config, errorHandler, log) {
const channelConfig = new ChannelConfig(address, config, errorHandler.errorCode());
return new Connection(new Channel(channelConfig), errorHandler, address, log, config.disableLosslessIntegers);
}

/**
Expand Down Expand Up @@ -217,7 +216,7 @@ export default class Connection {
*/
_handleFatalError(error) {
this._isBroken = true;
this._error = this._errorHandler.handleAndTransformError(error, this.hostPort);
this._error = this._errorHandler.handleAndTransformError(error, this.address);

if (this._log.isErrorEnabled()) {
this._log.error(`${this} experienced a fatal error ${JSON.stringify(this._error)}`);
Expand Down Expand Up @@ -267,7 +266,7 @@ export default class Connection {
}
try {
const error = newError(payload.message, payload.code);
this._currentFailure = this._errorHandler.handleAndTransformError(error, this.hostPort);
this._currentFailure = this._errorHandler.handleAndTransformError(error, this.address);
this._currentObserver.onError( this._currentFailure );
} finally {
this._updateCurrentObserver();
Expand Down
8 changes: 5 additions & 3 deletions src/v1/internal/http/http-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
import Driver from '../../driver';
import HttpSession from './http-session';
import HttpSessionTracker from './http-session-tracker';
import ServerAddress from '../server-address';

export default class HttpDriver extends Driver {

constructor(hostPort, userAgent, token, config) {
super(hostPort, userAgent, token, config);
constructor(url, userAgent, token, config) {
super(ServerAddress.fromUrl(url.hostAndPort), userAgent, token, config);
this._url = url;
this._sessionTracker = new HttpSessionTracker();
}

session() {
return new HttpSession(this._hostPort, this._authToken, this._config, this._sessionTracker);
return new HttpSession(this._url, this._authToken, this._config, this._sessionTracker);
}

close() {
Expand Down
Loading