Skip to content

Revert "Introduce sessionConnectionTimeout and updateRoutingTableTimeout" #969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
ConnectionErrorHandler
} from '../connection'
import { internal, error } from 'neo4j-driver-core'
import { controller } from '../lang'

const {
constants: { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V4_4 }
Expand All @@ -50,17 +49,12 @@ export default class DirectConnectionProvider extends PooledConnectionProvider {
this._handleAuthorizationExpired(error, address, database)
})

const acquireConnectionJob = {
run: () => this._connectionPool
.acquire(this._address)
.then(
connection =>
new DelegateConnection(connection, databaseSpecificErrorHandler)
),
onTimeout: connection => connection._release()
}

return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, acquireConnectionJob)
return this._connectionPool
.acquire(this._address)
.then(
connection =>
new DelegateConnection(connection, databaseSpecificErrorHandler)
)
}

_handleAuthorizationExpired (error, address, database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createChannelConnection, ConnectionErrorHandler } from '../connection'
import Pool, { PoolConfig } from '../pool'
import { error, newError, ConnectionProvider, ServerInfo } from 'neo4j-driver-core'
import { error, ConnectionProvider, ServerInfo } from 'neo4j-driver-core'

const { SERVICE_UNAVAILABLE } = error
export default class PooledConnectionProvider extends ConnectionProvider {
Expand Down Expand Up @@ -58,12 +58,6 @@ export default class PooledConnectionProvider extends ConnectionProvider {
log: this._log
})
this._openConnections = {}
this._sessionConnectionTimeoutConfig = {
timeout: this._config.sessionConnectionTimeout,
reason: () => newError(
`Session acquisition timed out in ${this._config.sessionConnectionTimeout} ms.`
)
}
}

_createConnectionErrorHandler () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import { HostNameResolver } from '../channel'
import SingleConnectionProvider from './connection-provider-single'
import PooledConnectionProvider from './connection-provider-pooled'
import { LeastConnectedLoadBalancingStrategy } from '../load-balancing'
import { controller } from '../lang'
import {
createChannelConnection,
ConnectionErrorHandler,
Expand Down Expand Up @@ -76,13 +75,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
)
})

this._updateRoutingTableTimeoutConfig = {
timeout: this._config.updateRoutingTableTimeout,
reason: () => newError(
`Routing table update timed out in ${this._config.updateRoutingTableTimeout} ms.`
)
}

this._routingContext = { ...routingContext, address: address.toString() }
this._seedRouter = address
this._rediscovery = new Rediscovery(this._routingContext)
Expand Down Expand Up @@ -151,66 +143,53 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._handleAuthorizationExpired(error, address, context.database)
)

const refreshRoutingTableJob = {
run: async (_, cancelationToken) => {
const routingTable = await this._freshRoutingTable({
accessMode,
database: context.database,
bookmarks: bookmarks,
impersonatedUser,
onDatabaseNameResolved: (databaseName) => {
context.database = context.database || databaseName
if (onDatabaseNameResolved) {
onDatabaseNameResolved(databaseName)
}
},
cancelationToken
})

// select a target server based on specified access mode
if (accessMode === READ) {
address = this._loadBalancingStrategy.selectReader(routingTable.readers)
name = 'read'
} else if (accessMode === WRITE) {
address = this._loadBalancingStrategy.selectWriter(routingTable.writers)
name = 'write'
} else {
throw newError('Illegal mode ' + accessMode)
}

// we couldn't select a target server
if (!address) {
throw newError(
`Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`,
SESSION_EXPIRED
)
const routingTable = await this._freshRoutingTable({
accessMode,
database: context.database,
bookmarks: bookmarks,
impersonatedUser,
onDatabaseNameResolved: (databaseName) => {
context.database = context.database || databaseName
if (onDatabaseNameResolved) {
onDatabaseNameResolved(databaseName)
}
return { routingTable, address }
}
}
})

const acquireConnectionJob = {
run: async ({ routingTable, address }) => {
try {
const connection = await this._acquireConnectionToServer(
address,
name,
routingTable
)
// select a target server based on specified access mode
if (accessMode === READ) {
address = this._loadBalancingStrategy.selectReader(routingTable.readers)
name = 'read'
} else if (accessMode === WRITE) {
address = this._loadBalancingStrategy.selectWriter(routingTable.writers)
name = 'write'
} else {
throw newError('Illegal mode ' + accessMode)
}

return new DelegateConnection(connection, databaseSpecificErrorHandler)
} catch (error) {
const transformed = databaseSpecificErrorHandler.handleAndTransformError(
error,
address
)
throw transformed
}
},
onTimeout: connection => connection._release()
// we couldn't select a target server
if (!address) {
throw newError(
`Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`,
SESSION_EXPIRED
)
}

return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, refreshRoutingTableJob, acquireConnectionJob)
try {
const connection = await this._acquireConnectionToServer(
address,
name,
routingTable
)

return new DelegateConnection(connection, databaseSpecificErrorHandler)
} catch (error) {
const transformed = databaseSpecificErrorHandler.handleAndTransformError(
error,
address
)
throw transformed
}
}

async _hasProtocolVersion (versionPredicate) {
Expand Down Expand Up @@ -322,28 +301,22 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
return this._connectionPool.acquire(address)
}

_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken = new controller.CancelationToken(() => false) } = {}) {
const refreshRoutingTableJob = {
run: (_, refreshCancelationToken) => {
const combinedCancelationToken = refreshCancelationToken.combine(cancelationToken)
const currentRoutingTable = this._routingTableRegistry.get(
database,
() => new RoutingTable({ database })
)
_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) {
const currentRoutingTable = this._routingTableRegistry.get(
database,
() => new RoutingTable({ database })
)

if (!currentRoutingTable.isStaleFor(accessMode)) {
return currentRoutingTable
}
this._log.info(
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
)
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, combinedCancelationToken)
}
if (!currentRoutingTable.isStaleFor(accessMode)) {
return currentRoutingTable
}
return controller.runWithTimeout(this._updateRoutingTableTimeoutConfig, refreshRoutingTableJob)
this._log.info(
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
)
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved)
}

_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken) {
_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved) {
const knownRouters = currentRoutingTable.routers

if (this._useSeedRouter) {
Expand All @@ -352,17 +325,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
currentRoutingTable,
bookmarks,
impersonatedUser,
onDatabaseNameResolved,
cancelationToken
onDatabaseNameResolved
)
}
return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser,
onDatabaseNameResolved,
cancelationToken
onDatabaseNameResolved
)
}

Expand All @@ -371,8 +342,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
currentRoutingTable,
bookmarks,
impersonatedUser,
onDatabaseNameResolved,
cancelationToken
onDatabaseNameResolved
) {
// we start with seed router, no routers were probed before
const seenRouters = []
Expand All @@ -381,8 +351,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._seedRouter,
currentRoutingTable,
bookmarks,
impersonatedUser,
cancelationToken
impersonatedUser
)

if (newRoutingTable) {
Expand All @@ -393,8 +362,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser,
cancelationToken
impersonatedUser
)
newRoutingTable = newRoutingTable2
error = error2 || error
Expand All @@ -413,15 +381,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
currentRoutingTable,
bookmarks,
impersonatedUser,
onDatabaseNameResolved,
cancelationToken
onDatabaseNameResolved
) {
let [newRoutingTable, error] = await this._fetchRoutingTableUsingKnownRouters(
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser,
cancelationToken
impersonatedUser
)

if (!newRoutingTable) {
Expand All @@ -431,8 +397,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._seedRouter,
currentRoutingTable,
bookmarks,
impersonatedUser,
cancelationToken
impersonatedUser
)
}

Expand All @@ -448,15 +413,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser,
cancelationToken
impersonatedUser
) {
const [newRoutingTable, error] = await this._fetchRoutingTable(
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser,
cancelationToken
impersonatedUser
)

if (newRoutingTable) {
Expand All @@ -481,19 +444,16 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
seedRouter,
routingTable,
bookmarks,
impersonatedUser,
cancelationToken
impersonatedUser
) {
const resolvedAddresses = await this._resolveSeedRouter(seedRouter)

cancelationToken.throwIfCancellationRequested()

// filter out all addresses that we've already tried
const newAddresses = resolvedAddresses.filter(
address => seenRouters.indexOf(address) < 0
)

return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser, cancelationToken)
return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser)
}

async _resolveSeedRouter (seedRouter) {
Expand All @@ -505,7 +465,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
return [].concat.apply([], dnsResolvedAddresses)
}

async _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser, cancelationToken) {
async _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser) {
return routerAddresses.reduce(
async (refreshedTablePromise, currentRouter, currentIndex) => {
const [newRoutingTable] = await refreshedTablePromise
Expand Down Expand Up @@ -539,13 +499,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
impersonatedUser
), null]
} catch (error) {
cancelationToken.throwIfCancellationRequested()
return this._handleRediscoveryError(error, currentRouter)
} finally {
await session.close()
session.close()
}
} else {
cancelationToken.throwIfCancellationRequested()
// unable to acquire connection and create session towards the current router
// return null to signal that the next router should be tried
return [null, error]
Expand Down
Loading