Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
*/
_createConnection ({ auth }, address, release) {
return this._createChannelConnection(address).then(connection => {
connection._release = () => {
connection.release = () => {
return release(address, connection)
}
this._openConnections[connection.id] = connection
Expand Down Expand Up @@ -160,7 +160,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
await connection.resetAndFlush()
}
} finally {
await connection._release()
await connection.release()
}
return serverInfo
}
Expand Down Expand Up @@ -191,7 +191,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
}
throw error
} finally {
await Promise.all(connectionsToRelease.map(conn => conn._release()))
await Promise.all(connectionsToRelease.map(conn => conn.release()))
}
}

Expand All @@ -201,7 +201,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
connection._sticky = connectionWithSameCredentials && !connection.supportsReAuth

if (shouldCreateStickyConnection || connection._sticky) {
await connection._release()
await connection.release()
throw newError('Driver is connected to a database that does not support user switch.')
}
}
Expand Down
20 changes: 20 additions & 0 deletions packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,26 @@ export default class ChannelConnection extends Connection {
}
}

beginTransaction (config) {
return this._protocol.beginTransaction(config)
}

run (query, parameters, config) {
return this._protocol.run(query, parameters, config)
}

commitTransaction (config) {
return this._protocol.commitTransaction(config)
}

rollbackTransaction (config) {
return this._protocol.rollbackTransaction(config)
}

getProtocolVersion () {
return this._protocol.version
}

get authToken () {
return this._authToken
}
Expand Down
24 changes: 22 additions & 2 deletions packages/bolt-connection/src/connection/connection-delegate.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ export default class DelegateConnection extends Connection {
this._delegate = delegate
}

beginTransaction (config) {
return this._delegate.beginTransaction(config)
}

run (query, param, config) {
return this._delegate.run(query, param, config)
}

commitTransaction (config) {
return this._delegate.commitTransaction(config)
}

rollbackTransaction (config) {
return this._delegate.rollbackTransaction(config)
}

getProtocolVersion () {
return this._delegate.getProtocolVersion()
}

get id () {
return this._delegate.id
}
Expand Down Expand Up @@ -103,11 +123,11 @@ export default class DelegateConnection extends Connection {
return this._delegate.close()
}

_release () {
release () {
if (this._originalErrorHandler) {
this._delegate._errorHandler = this._originalErrorHandler
}

return this._delegate._release()
return this._delegate.release()
}
}
23 changes: 3 additions & 20 deletions packages/bolt-connection/src/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
// eslint-disable-next-line no-unused-vars
import { ResultStreamObserver, BoltProtocol } from '../bolt'
import { Connection as CoreConnection } from 'neo4j-driver-core'

export default class Connection {
export default class Connection extends CoreConnection {
/**
* @param {ConnectionErrorHandler} errorHandler the error handler
*/
constructor (errorHandler) {
super()
this._errorHandler = errorHandler
}

Expand Down Expand Up @@ -51,13 +53,6 @@ export default class Connection {
throw new Error('not implemented')
}

/**
* @returns {boolean} whether this connection is in a working condition
*/
isOpen () {
throw new Error('not implemented')
}

/**
* @returns {BoltProtocol} the underlying bolt protocol assigned to this connection
*/
Expand Down Expand Up @@ -109,18 +104,6 @@ export default class Connection {
throw new Error('not implemented')
}

/**
* Send a RESET-message to the database. Message is immediately flushed to the network.
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
*/
resetAndFlush () {
throw new Error('not implemented')
}

hasOngoingObservableRequests () {
throw new Error('not implemented')
}

/**
* Call close on the channel.
* @returns {Promise<void>} - A promise that will be resolved when the connection is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ describe('constructor', () => {

const connection = await create({}, server0, release)

const released = connection._release()
const released = connection.release()

expect(released).toBe(releaseResult)
expect(release).toHaveBeenCalledWith(server0, connection)
Expand Down Expand Up @@ -546,7 +546,7 @@ describe('user-switching', () => {

expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.'))
expect(poolAcquire).toHaveBeenCalledWith({ auth: acquireAuth }, address)
expect(connection._release).toHaveBeenCalled()
expect(connection.release).toHaveBeenCalled()
expect(connection._sticky).toEqual(isStickyConn)
})
})
Expand Down Expand Up @@ -599,15 +599,15 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {

await connectionProvider.verifyConnectivityAndGetServerInfo()

expect(seenConnections[0]._release).toHaveBeenCalledTimes(1)
expect(seenConnections[0].release).toHaveBeenCalledTimes(1)
})

it('should resetAndFlush and then release the connection', async () => {
const { connectionProvider, seenConnections, resetAndFlush } = setup()

await connectionProvider.verifyConnectivityAndGetServerInfo()

expect(seenConnections[0]._release.mock.invocationCallOrder[0])
expect(seenConnections[0].release.mock.invocationCallOrder[0])
.toBeGreaterThan(resetAndFlush.mock.invocationCallOrder[0])
})

Expand Down Expand Up @@ -636,7 +636,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
await connectionProvider.verifyConnectivityAndGetServerInfo()
} catch (e) {
} finally {
expect(seenConnections[0]._release).toHaveBeenCalledTimes(1)
expect(seenConnections[0].release).toHaveBeenCalledTimes(1)
}
})

Expand Down Expand Up @@ -692,7 +692,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
}
connection.resetAndFlush = resetAndFlush
if (releaseMock) {
connection._release = releaseMock
connection.release = releaseMock
}
seenConnections.push(connection)
return connection
Expand Down Expand Up @@ -782,7 +782,7 @@ class FakeConnection extends Connection {
super(null)

this._address = address
this._release = jest.fn(() => release(address, this))
this.release = jest.fn(() => release(address, this))
this._server = server
this._authToken = auth
this._closed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2834,8 +2834,8 @@ describe.each([

expect(connections.length).toBe(1)
expect(connections[0].resetAndFlush).toHaveBeenCalled()
expect(connections[0]._release).toHaveBeenCalled()
expect(connections[0]._release.mock.invocationCallOrder[0])
expect(connections[0].release).toHaveBeenCalled()
expect(connections[0].release.mock.invocationCallOrder[0])
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
})

Expand All @@ -2856,7 +2856,7 @@ describe.each([

// extra checks
expect(connections.length).toBe(1)
expect(connections[0]._release).toHaveBeenCalled()
expect(connections[0].release).toHaveBeenCalled()
})

it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => {
Expand Down Expand Up @@ -2900,7 +2900,7 @@ describe.each([

expect(connections.length).toBe(1)
expect(connections[0].resetAndFlush).toHaveBeenCalled()
expect(connections[0]._release).toHaveBeenCalled()
expect(connections[0].release).toHaveBeenCalled()
}
}
})
Expand Down Expand Up @@ -2956,8 +2956,8 @@ describe.each([

expect(connections.length).toBe(1)
expect(connections[0].resetAndFlush).toHaveBeenCalled()
expect(connections[0]._release).toHaveBeenCalled()
expect(connections[0]._release.mock.invocationCallOrder[0])
expect(connections[0].release).toHaveBeenCalled()
expect(connections[0].release.mock.invocationCallOrder[0])
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
}
}
Expand All @@ -2979,7 +2979,7 @@ describe.each([

expect(connections.length).toBe(1)
expect(connections[0].resetAndFlush).toHaveBeenCalled()
expect(connections[0]._release).toHaveBeenCalled()
expect(connections[0].release).toHaveBeenCalled()
}
}
})
Expand Down Expand Up @@ -3054,7 +3054,7 @@ describe.each([
connection.resetAndFlush = resetAndFlush
}
if (releaseMock) {
connection._release = releaseMock
connection.release = releaseMock
}
seenConnectionsPerAddress.get(address).push(connection)
return connection
Expand Down Expand Up @@ -3193,7 +3193,7 @@ describe.each([

const connection = await create({}, server0, release)

const released = connection._release()
const released = connection.release()

expect(released).toBe(releaseResult)
expect(release).toHaveBeenCalledWith(server0, connection)
Expand Down Expand Up @@ -3460,7 +3460,7 @@ describe.each([

expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.'))
expect(poolAcquire).toHaveBeenCalledWith({ auth }, server3)
expect(connection._release).toHaveBeenCalled()
expect(connection.release).toHaveBeenCalled()
expect(connection._sticky).toEqual(isStickyConn)
})

Expand Down Expand Up @@ -3502,7 +3502,7 @@ describe.each([

expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.'))
expect(poolAcquire).toHaveBeenCalledWith({ auth }, server1)
expect(connection._release).toHaveBeenCalled()
expect(connection.release).toHaveBeenCalled()
expect(connection._sticky).toEqual(isStickyConn)
})

Expand Down Expand Up @@ -3546,7 +3546,7 @@ describe.each([

expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.'))
expect(poolAcquire).toHaveBeenCalledWith({ auth }, server0)
expect(connection._release).toHaveBeenCalled()
expect(connection.release).toHaveBeenCalled()
expect(connection._sticky).toEqual(isStickyConn)
})

Expand Down Expand Up @@ -3575,7 +3575,7 @@ describe.each([

expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.'))
expect(poolAcquire).toHaveBeenCalledWith({ auth }, server0)
expect(connection._release).toHaveBeenCalled()
expect(connection.release).toHaveBeenCalled()
expect(connection._sticky).toEqual(isStickyConn)
})
})
Expand Down Expand Up @@ -3903,7 +3903,7 @@ class FakeConnection extends Connection {
this._version = version
this._protocolVersion = protocolVersion
this.release = release
this._release = jest.fn(() => release(address, this))
this.release = jest.fn(() => release(address, this))
this.resetAndFlush = jest.fn(() => Promise.resolve())
this._server = server
this._authToken = authToken
Expand Down
Loading