Skip to content

Fix verifyConnectivity/getServerInfo procedures #888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,21 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
})

const servers = accessMode === WRITE ? routingTable.writers : routingTable.readers

let error = newError(
`No servers available for database '${context.database}' with access mode '${accessMode}'`,
SERVICE_UNAVAILABLE
)

return Promise.all(servers.map(address => this._verifyConnectivityAndGetServerVersion({ address })))
.then(([serverInfo]) => serverInfo)
for (const address of servers) {
try {
const serverInfo = await this._verifyConnectivityAndGetServerVersion({ address })
return serverInfo
} catch (e) {
error = e
}
}
throw error
}

forget (address, database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2493,24 +2493,24 @@ describe('#unit RoutingConnectionProvider', () => {
expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion))
})

it('should acquire, resetAndFlush and release connections for sever with the selected access mode', async () => {
it('should acquire, resetAndFlush and release connections for sever first', async () => {
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup()
const acquireSpy = jest.spyOn(pool, 'acquire')

await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })

const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
for (const address of targetServers) {
expect(acquireSpy).toHaveBeenCalledWith(address)
const address = targetServers[0]
expect(acquireSpy).toHaveBeenCalledWith(address)

const connections = seenConnectionsPerAddress.get(address)
const connections = seenConnectionsPerAddress.get(address)

expect(connections.length).toBe(1)
expect(connections[0].resetAndFlush).toHaveBeenCalled()
expect(connections[0]._release).toHaveBeenCalled()
expect(connections[0]._release.mock.invocationCallOrder[0])
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
}
expect(connections.length).toBe(1)
expect(connections[0].resetAndFlush).toHaveBeenCalled()
expect(connections[0]._release).toHaveBeenCalled()
expect(connections[0]._release.mock.invocationCallOrder[0])
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])

})

it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => {
Expand All @@ -2527,20 +2527,96 @@ describe('#unit RoutingConnectionProvider', () => {
})

describe('when the reset and flush fails for at least one the address', () => {
it('should fails with the reset and flush error', async () => {
it('should succeed with the server info ', async () => {
const error = newError('Error')
let i = 0
const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve())
const { connectionProvider } = setup({ resetAndFlush })
const { connectionProvider, server, protocolVersion } = setup({ resetAndFlush })

const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })

expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion))
})

it('should release the connection', async () => {
const error = newError('Error')
let i = 0
const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve())
const { connectionProvider, seenConnectionsPerAddress, routingTable } = setup({ resetAndFlush })

try {
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
expect().toBe('Not reached')
} catch (e) {
} finally {
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
for (const address of targetServers) {
const connections = seenConnectionsPerAddress.get(address)

expect(connections.length).toBe(1)
expect(connections[0].resetAndFlush).toHaveBeenCalled()
expect(connections[0]._release).toHaveBeenCalled()
}
}
})

describe('and the release fails', () => {
it('should fails with the release error', async () => {
const error = newError('Error')
const releaseError = newError('Release error')
let i = 0
const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve())
const releaseMock = jest.fn(() => Promise.reject(releaseError))
const { connectionProvider } = setup({ resetAndFlush, releaseMock })

try {
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
expect().toBe('Not reached')
} catch (e) {
expect(e).toBe(releaseError)
}
})
})
})

describe('when the reset and flush fails for all addresses', () => {
it('should succeed with the server info ', async () => {
const error = newError('Error')
const resetAndFlush = jest.fn(() => Promise.reject(error))
const { connectionProvider } = setup({ resetAndFlush })

try {
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
expect().toBe('Not reached')
} catch(e) {
expect(e).toBe(error)
}
})

it('should acquire, resetAndFlush and release connections for all servers', async () => {
const resetAndFlush = jest.fn(() => Promise.reject(error))
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ resetAndFlush })
const acquireSpy = jest.spyOn(pool, 'acquire')

try {
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
} catch (e) {
// nothing to do
} finally {
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
for (const address of targetServers) {
expect(acquireSpy).toHaveBeenCalledWith(address)

const connections = seenConnectionsPerAddress.get(address)

expect(connections.length).toBe(1)
expect(connections[0].resetAndFlush).toHaveBeenCalled()
expect(connections[0]._release).toHaveBeenCalled()
expect(connections[0]._release.mock.invocationCallOrder[0])
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
}
}
})

it('should release the connection', async () => {
const error = newError('Error')
let i = 0
Expand Down Expand Up @@ -2583,16 +2659,28 @@ describe('#unit RoutingConnectionProvider', () => {
})

describe('when the release for at least one the address', () => {
it('should fails with the reset and flush error', async () => {
it('should succeed with the server info', async () => {
const error = newError('Error')
let i = 0
const releaseMock = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve())
const { connectionProvider, server, protocolVersion } = setup({ releaseMock })

const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })

expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion))
})
})

describe('when the release for all address', () => {
it('should fail with release error', async () => {
const error = newError('Error')
const releaseMock = jest.fn(() => Promise.reject(error))
const { connectionProvider } = setup({ releaseMock })

try {
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
expect().toBe('Not reached')
} catch (e) {
} catch(e) {
expect(e).toBe(error)
}
})
Expand Down Expand Up @@ -2668,7 +2756,7 @@ describe('#unit RoutingConnectionProvider', () => {
})

describe('when at least the one of the connections could not be created', () => {
it('should reject with acquistion timeout error', async () => {
it('should succeed with the server info', async () => {
let i = 0
const error = new Error('Connection creation error')
const routingTable = newRoutingTable(
Expand All @@ -2694,11 +2782,46 @@ describe('#unit RoutingConnectionProvider', () => {
pool
)

const serverInfo = connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
expect(serverInfo).toEqual(new ServerInfo({}, 4.4))
})
})

describe('when there is not server available in the routing table', () => {
it('should thown an discovery error', async () => {
const expectedError = newError(
`No servers available for database '${database || null}' with access mode '${accessMode || READ}'`,
SERVICE_UNAVAILABLE
)
const routingTable = newRoutingTable(
database || null,
[server1, server2],
accessMode === READ ? [] : [server3, server4],
accessMode === WRITE ? [] : [server5, server6],
Integer.MAX_SAFE_VALUE
)

const routingTableToRouters = {}
const routingMap = {}
routingMap[server0.asKey()] = routingTable
routingMap[server1.asKey()] = routingTable
routingMap[server2.asKey()] = routingTable
routingTableToRouters[database || null] = routingMap

const connectionProvider = newRoutingConnectionProviderWithSeedRouter(
server0,
[server0], // seed router address resolves just to itself
[
routingTable
],
routingTableToRouters
)

try {
connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
expect().toBe('not reached')
} catch (e) {
expect(e).toBe(error)
} catch (error) {
expect(error).toEqual(expectedError)
}
})
})
Expand Down