Skip to content

Commit cc4598b

Browse files
authored
Avoid sending resets when it is not needed (#902)
The `RESET` should be send when a failure occurs or whenever the connection is being sent back to the pool with a pending request running, i.e. the bolt server is not in the `READY` state. These changes also affect the `verifyConnectivity` and `getServerInfo` implementation. The `RESET` message is not sent in these methods if it is a newly created connection.
1 parent c0580eb commit cc4598b

17 files changed

+352
-14
lines changed

packages/bolt-connection/src/bolt/bolt-protocol-v1.js

+11
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ export default class BoltProtocol {
7979
this._log = log
8080
this._onProtocolError = onProtocolError
8181
this._fatalError = null
82+
this._lastMessageSignature = null
8283
}
8384

8485
/**
@@ -356,6 +357,8 @@ export default class BoltProtocol {
356357
this._log.debug(`C: ${message}`)
357358
}
358359

360+
this._lastMessageSignature = message.signature
361+
359362
this.packer().packStruct(
360363
message.signature,
361364
message.fields.map(field => this.packer().packable(field))
@@ -369,6 +372,14 @@ export default class BoltProtocol {
369372
}
370373
}
371374

375+
isLastMessageLogin () {
376+
return this._lastMessageSignature === 0x01
377+
}
378+
379+
isLastMessageReset () {
380+
return this._lastMessageSignature === 0x0f
381+
}
382+
372383
/**
373384
* Notifies faltal erros to the observers and mark the protocol in the fatal error state.
374385
* @param {Error} error The error

packages/bolt-connection/src/connection-provider/connection-provider-pooled.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ export default class PooledConnectionProvider extends ConnectionProvider {
7171
*/
7272
_createConnection (address, release) {
7373
return this._createChannelConnection(address).then(connection => {
74-
connection._release = () => release(address, connection)
74+
connection._release = () => {
75+
return release(address, connection)
76+
}
7577
this._openConnections[connection.id] = connection
7678
return connection
7779
.connect(this._userAgent, this._authToken)
@@ -119,7 +121,9 @@ export default class PooledConnectionProvider extends ConnectionProvider {
119121
const connection = await this._connectionPool.acquire(address)
120122
const serverInfo = new ServerInfo(connection.server, connection.protocol().version)
121123
try {
122-
await connection.resetAndFlush()
124+
if (!connection.protocol().isLastMessageLogin()) {
125+
await connection.resetAndFlush()
126+
}
123127
} finally {
124128
await connection._release()
125129
}

packages/bolt-connection/src/connection/connection-channel.js

+39-2
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ export default class ChannelConnection extends Connection {
123123
) {
124124
super(errorHandler)
125125

126+
this._reseting = false
127+
this._resetObservers = []
126128
this._id = idGenerator++
127129
this._address = address
128130
this._server = { address: address.asHostPort() }
@@ -304,7 +306,7 @@ export default class ChannelConnection extends Connection {
304306
*/
305307
resetAndFlush () {
306308
return new Promise((resolve, reject) => {
307-
this._protocol.reset({
309+
this._reset({
308310
onError: error => {
309311
if (this._isBroken) {
310312
// handling a fatal error, no need to raise a protocol violation
@@ -328,7 +330,7 @@ export default class ChannelConnection extends Connection {
328330
return
329331
}
330332

331-
this._protocol.reset({
333+
this._reset({
332334
onError: () => {
333335
this._protocol.resetFailure()
334336
},
@@ -338,6 +340,41 @@ export default class ChannelConnection extends Connection {
338340
})
339341
}
340342

343+
_reset(observer) {
344+
if (this._reseting) {
345+
if (!this._protocol.isLastMessageReset()) {
346+
this._protocol.reset({
347+
onError: error => {
348+
observer.onError(error)
349+
}, onComplete: () => {
350+
observer.onComplete()
351+
}
352+
})
353+
} else {
354+
this._resetObservers.push(observer)
355+
}
356+
return
357+
}
358+
359+
this._resetObservers.push(observer)
360+
this._reseting = true
361+
362+
const notifyFinish = (notify) => {
363+
this._reseting = false
364+
const observers = this._resetObservers
365+
this._resetObservers = []
366+
observers.forEach(notify)
367+
}
368+
369+
this._protocol.reset({
370+
onError: error => {
371+
notifyFinish(obs => obs.onError(error))
372+
}, onComplete: () => {
373+
notifyFinish(obs => obs.onComplete())
374+
}
375+
})
376+
}
377+
341378
/*
342379
* Pop next pending observer form the list of observers and make it current observer.
343380
* @protected

packages/bolt-connection/src/connection/connection-delegate.js

+4
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ export default class DelegateConnection extends Connection {
8383
return this._delegate.resetAndFlush()
8484
}
8585

86+
hasOngoingObservableRequests () {
87+
return this._delegate.hasOngoingObservableRequests()
88+
}
89+
8690
close () {
8791
return this._delegate.close()
8892
}

packages/bolt-connection/src/connection/connection.js

+4
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ export default class Connection {
103103
throw new Error('not implemented')
104104
}
105105

106+
hasOngoingObservableRequests () {
107+
throw new Error('not implemented')
108+
}
109+
106110
/**
107111
* Call close on the channel.
108112
* @returns {Promise<void>} - A promise that will be resolved when the connection is closed.

packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
251251
const create = (address, release) => {
252252
const connection = new FakeConnection(address, release, server)
253253
connection.protocol = () => {
254-
return { version: protocolVersion }
254+
return { version: protocolVersion, isLastMessageLogin() { return false } }
255255
}
256256
connection.resetAndFlush = resetAndFlush
257257
if (releaseMock) {

packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js

+24-2
Original file line numberDiff line numberDiff line change
@@ -2584,6 +2584,26 @@ describe('#unit RoutingConnectionProvider', () => {
25842584
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
25852585
})
25862586

2587+
it('should not call resetAndFlush for newly created connections', async () => {
2588+
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ newConnection: true })
2589+
const acquireSpy = jest.spyOn(pool, 'acquire')
2590+
2591+
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2592+
2593+
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
2594+
const address = targetServers[0]
2595+
expect(acquireSpy).toHaveBeenCalledWith(address)
2596+
2597+
const connections = seenConnectionsPerAddress.get(address)
2598+
2599+
// verifying resetAndFlush was not called
2600+
expect(connections[0].resetAndFlush).not.toHaveBeenCalled()
2601+
2602+
// extra checks
2603+
expect(connections.length).toBe(1)
2604+
expect(connections[0]._release).toHaveBeenCalled()
2605+
})
2606+
25872607
it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => {
25882608
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup()
25892609
const acquireSpy = jest.spyOn(pool, 'acquire')
@@ -2756,7 +2776,7 @@ describe('#unit RoutingConnectionProvider', () => {
27562776
})
27572777
})
27582778

2759-
function setup ({ resetAndFlush, releaseMock } = {}) {
2779+
function setup ({ resetAndFlush, releaseMock, newConnection } = { }) {
27602780
const routingTable = newRoutingTable(
27612781
database || null,
27622782
[server1, server2],
@@ -2774,6 +2794,7 @@ describe('#unit RoutingConnectionProvider', () => {
27742794
seenConnectionsPerAddress.set(address, [])
27752795
}
27762796
const connection = new FakeConnection(address, release, 'version', protocolVersion, server)
2797+
connection._firstUsage = !!newConnection
27772798
if (resetAndFlush) {
27782799
connection.resetAndFlush = resetAndFlush
27792800
}
@@ -3082,7 +3103,8 @@ class FakeConnection extends Connection {
30823103

30833104
protocol () {
30843105
return {
3085-
version: this._protocolVersion
3106+
version: this._protocolVersion,
3107+
isLastMessageLogin: () => this._firstUsage
30863108
}
30873109
}
30883110
}

packages/bolt-connection/test/connection/connection-channel.test.js

+174
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,76 @@ describe('ChannelConnection', () => {
260260
expect(protocol.reset).toHaveBeenCalled()
261261
expect(protocol.resetFailure).toHaveBeenCalled()
262262
})
263+
264+
it('should not call protocol.reset() when there is an ongoing reset', () => {
265+
const channel = {
266+
_open: true
267+
}
268+
269+
const protocol = {
270+
reset: jest.fn(),
271+
resetFailure: jest.fn(),
272+
isLastMessageReset: jest.fn(() => true)
273+
}
274+
const protocolSupplier = () => protocol
275+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
276+
277+
connection._resetOnFailure()
278+
279+
expect(protocol.reset).toHaveBeenCalledTimes(1)
280+
expect(protocol.resetFailure).not.toHaveBeenCalled()
281+
282+
connection._resetOnFailure()
283+
284+
expect(protocol.reset).toHaveBeenCalledTimes(1)
285+
expect(protocol.resetFailure).not.toHaveBeenCalled()
286+
})
287+
288+
it('should call protocol.reset() when after a previous reset completed', () => {
289+
const channel = {
290+
_open: true
291+
}
292+
293+
const protocol = {
294+
reset: jest.fn(observer => observer.onComplete()),
295+
resetFailure: jest.fn()
296+
}
297+
const protocolSupplier = () => protocol
298+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
299+
300+
connection._resetOnFailure()
301+
302+
expect(protocol.reset).toHaveBeenCalledTimes(1)
303+
expect(protocol.resetFailure).toHaveBeenCalledTimes(1)
304+
305+
connection._resetOnFailure()
306+
307+
expect(protocol.reset).toHaveBeenCalledTimes(2)
308+
expect(protocol.resetFailure).toHaveBeenCalledTimes(2)
309+
})
310+
311+
it('should call protocol.reset() when after a previous reset fail', () => {
312+
const channel = {
313+
_open: true
314+
}
315+
316+
const protocol = {
317+
reset: jest.fn(observer => observer.onError(new Error('some error'))),
318+
resetFailure: jest.fn()
319+
}
320+
const protocolSupplier = () => protocol
321+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
322+
323+
connection._resetOnFailure()
324+
325+
expect(protocol.reset).toHaveBeenCalledTimes(1)
326+
expect(protocol.resetFailure).toHaveBeenCalledTimes(1)
327+
328+
connection._resetOnFailure()
329+
330+
expect(protocol.reset).toHaveBeenCalledTimes(2)
331+
expect(protocol.resetFailure).toHaveBeenCalledTimes(2)
332+
})
263333
})
264334

265335
describe('when connection is not open', () => {
@@ -340,6 +410,110 @@ describe('ChannelConnection', () => {
340410
})
341411
})
342412

413+
describe('.resetAndFlush()', () => {
414+
it('should call protocol.reset() onComplete', async () => {
415+
const channel = {
416+
_open: true
417+
}
418+
419+
const protocol = {
420+
reset: jest.fn(observer => observer.onComplete()),
421+
resetFailure: jest.fn()
422+
}
423+
const protocolSupplier = () => protocol
424+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
425+
426+
await connection.resetAndFlush().catch(() => {})
427+
428+
expect(protocol.reset).toHaveBeenCalled()
429+
})
430+
431+
it('should call protocol.reset() onError', async () => {
432+
const channel = {
433+
_open: true
434+
}
435+
436+
const protocol = {
437+
reset: jest.fn(observer => observer.onError()),
438+
resetFailure: jest.fn()
439+
}
440+
const protocolSupplier = () => protocol
441+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
442+
443+
await connection.resetAndFlush().catch(() => {})
444+
445+
expect(protocol.reset).toHaveBeenCalled()
446+
})
447+
448+
it('should not call protocol.reset() when there is an ongoing reset', async () => {
449+
const channel = {
450+
_open: true
451+
}
452+
453+
const protocol = {
454+
reset: jest.fn(observer => {
455+
setTimeout(() => observer.onComplete(), 100)
456+
}),
457+
resetFailure: jest.fn(),
458+
isLastMessageReset: jest.fn(() => true)
459+
}
460+
const protocolSupplier = () => protocol
461+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
462+
463+
const completeFirstResetAndFlush = connection.resetAndFlush()
464+
465+
expect(protocol.reset).toHaveBeenCalledTimes(1)
466+
467+
await connection.resetAndFlush()
468+
469+
expect(protocol.reset).toHaveBeenCalledTimes(1)
470+
471+
await completeFirstResetAndFlush
472+
})
473+
474+
it('should call protocol.reset() when after a previous reset completed', async () => {
475+
const channel = {
476+
_open: true
477+
}
478+
479+
const protocol = {
480+
reset: jest.fn(observer => observer.onComplete()),
481+
resetFailure: jest.fn()
482+
}
483+
const protocolSupplier = () => protocol
484+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
485+
486+
await connection.resetAndFlush()
487+
488+
expect(protocol.reset).toHaveBeenCalledTimes(1)
489+
490+
await connection.resetAndFlush()
491+
492+
expect(protocol.reset).toHaveBeenCalledTimes(2)
493+
})
494+
495+
it('should call protocol.reset() when after a previous reset fail', async () => {
496+
const channel = {
497+
_open: true
498+
}
499+
500+
const protocol = {
501+
reset: jest.fn(observer => observer.onError(new Error('some error'))),
502+
resetFailure: jest.fn()
503+
}
504+
const protocolSupplier = () => protocol
505+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
506+
507+
await connection.resetAndFlush().catch(() => {})
508+
509+
expect(protocol.reset).toHaveBeenCalledTimes(1)
510+
511+
await connection.resetAndFlush().catch(() => {})
512+
513+
expect(protocol.reset).toHaveBeenCalledTimes(2)
514+
})
515+
})
516+
343517
function spyOnConnectionChannel ({
344518
channel,
345519
errorHandler,

0 commit comments

Comments
 (0)