Skip to content

Commit 5b4f475

Browse files
authored
AsyncIterator API for Result consumption (#831)
The current driver implementation has only two options for record consumption, the default async/promise api and the reactive api. In the async side, there is no simple way to interact over the results before consuming all the data stream and back-pressure the consumption. Meanwhile in the reactive, the back-pressure is implemented but this api usage depends on the knowledge of reactive programming and `RxJS`. Another issue with our current reactive implementation is not having the way to configure the back-pressure or pause the stream consumption. The introduction of the Async Iterator is part of a group of improvements to make the driver more idiomatic and memory efficient.
1 parent a4efaff commit 5b4f475

35 files changed

+1956
-363
lines changed

packages/bolt-connection/src/bolt/bolt-protocol-v1.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,9 @@ export default class BoltProtocol {
278278
afterError,
279279
beforeComplete,
280280
afterComplete,
281-
flush = true
281+
flush = true,
282+
highRecordWatermark = Number.MAX_VALUE,
283+
lowRecordWatermark = Number.MAX_VALUE
282284
} = {}
283285
) {
284286
const observer = new ResultStreamObserver({
@@ -288,7 +290,9 @@ export default class BoltProtocol {
288290
beforeError,
289291
afterError,
290292
beforeComplete,
291-
afterComplete
293+
afterComplete,
294+
highRecordWatermark,
295+
lowRecordWatermark
292296
})
293297

294298
// bookmark and mode are ignored in this version of the protocol

packages/bolt-connection/src/bolt/bolt-protocol-v3.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ export default class BoltProtocol extends BoltProtocolV2 {
163163
afterError,
164164
beforeComplete,
165165
afterComplete,
166-
flush = true
166+
flush = true,
167+
highRecordWatermark = Number.MAX_VALUE,
168+
lowRecordWatermark = Number.MAX_VALUE
167169
} = {}
168170
) {
169171
const observer = new ResultStreamObserver({
@@ -173,7 +175,9 @@ export default class BoltProtocol extends BoltProtocolV2 {
173175
beforeError,
174176
afterError,
175177
beforeComplete,
176-
afterComplete
178+
afterComplete,
179+
highRecordWatermark,
180+
lowRecordWatermark
177181
})
178182

179183
// passing in a database name on this protocol version throws an error

packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ export default class BoltProtocol extends BoltProtocolV3 {
9090
afterComplete,
9191
flush = true,
9292
reactive = false,
93-
fetchSize = FETCH_ALL
93+
fetchSize = FETCH_ALL,
94+
highRecordWatermark = Number.MAX_VALUE,
95+
lowRecordWatermark = Number.MAX_VALUE
9496
} = {}
9597
) {
9698
const observer = new ResultStreamObserver({
@@ -104,7 +106,9 @@ export default class BoltProtocol extends BoltProtocolV3 {
104106
beforeError,
105107
afterError,
106108
beforeComplete,
107-
afterComplete
109+
afterComplete,
110+
highRecordWatermark,
111+
lowRecordWatermark
108112
})
109113

110114
// passing impersonated user on this protocol version throws an error

packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ export default class BoltProtocol extends BoltProtocolV43 {
8484
afterComplete,
8585
flush = true,
8686
reactive = false,
87-
fetchSize = FETCH_ALL
87+
fetchSize = FETCH_ALL,
88+
highRecordWatermark = Number.MAX_VALUE,
89+
lowRecordWatermark = Number.MAX_VALUE
8890
} = {}
8991
) {
9092
const observer = new ResultStreamObserver({
@@ -98,7 +100,9 @@ export default class BoltProtocol extends BoltProtocolV43 {
98100
beforeError,
99101
afterError,
100102
beforeComplete,
101-
afterComplete
103+
afterComplete,
104+
highRecordWatermark,
105+
lowRecordWatermark
102106
})
103107

104108
const flushRun = reactive

packages/bolt-connection/src/bolt/stream-observers.js

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ class ResultStreamObserver extends StreamObserver {
6868
afterKeys,
6969
beforeComplete,
7070
afterComplete,
71-
server
71+
server,
72+
highRecordWatermark = Number.MAX_VALUE,
73+
lowRecordWatermark = Number.MAX_VALUE
7274
} = {}) {
7375
super()
7476

@@ -94,8 +96,32 @@ class ResultStreamObserver extends StreamObserver {
9496
this._discardFunction = discardFunction
9597
this._discard = false
9698
this._fetchSize = fetchSize
99+
this._lowRecordWatermark = lowRecordWatermark
100+
this._highRecordWatermark = highRecordWatermark
97101
this._setState(reactive ? _states.READY : _states.READY_STREAMING)
98-
this._setupAuoPull(fetchSize)
102+
this._setupAutoPull()
103+
this._paused = false;
104+
}
105+
106+
/**
107+
* Pause the record consuming
108+
*
109+
* This function will supend the record consuming. It will not cancel the stream and the already
110+
* requested records will be sent to the subscriber.
111+
*/
112+
pause () {
113+
this._paused = true
114+
}
115+
116+
/**
117+
* Resume the record consuming
118+
*
119+
* This function will resume the record consuming fetching more records from the server.
120+
*/
121+
resume () {
122+
this._paused = false
123+
this._setupAutoPull(true)
124+
this._state.pull(this)
99125
}
100126

101127
/**
@@ -342,16 +368,21 @@ class ResultStreamObserver extends StreamObserver {
342368

343369
_handleStreaming () {
344370
if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) {
345-
if (this._discard) {
346-
this._discardFunction(this._queryId, this)
347-
this._setState(_states.STREAMING)
348-
} else if (this._autoPull) {
349-
this._moreFunction(this._queryId, this._fetchSize, this)
350-
this._setState(_states.STREAMING)
371+
if (!this._paused && (this._discard || this._autoPull)) {
372+
this._more()
351373
}
352374
}
353375
}
354376

377+
_more () {
378+
if (this._discard) {
379+
this._discardFunction(this._queryId, this)
380+
} else {
381+
this._moreFunction(this._queryId, this._fetchSize, this)
382+
}
383+
this._setState(_states.STREAMING)
384+
}
385+
355386
_storeMetadataForCompletion (meta) {
356387
const keys = Object.keys(meta)
357388
let index = keys.length
@@ -367,15 +398,8 @@ class ResultStreamObserver extends StreamObserver {
367398
this._state = state
368399
}
369400

370-
_setupAuoPull (fetchSize) {
401+
_setupAutoPull () {
371402
this._autoPull = true
372-
if (fetchSize === FETCH_ALL) {
373-
this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull
374-
this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull
375-
} else {
376-
this._lowRecordWatermark = 0.3 * fetchSize
377-
this._highRecordWatermark = 0.7 * fetchSize
378-
}
379403
}
380404
}
381405

@@ -575,7 +599,8 @@ const _states = {
575599
},
576600
name: () => {
577601
return 'READY_STREAMING'
578-
}
602+
},
603+
pull: () => {}
579604
},
580605
READY: {
581606
// reactive start state
@@ -590,7 +615,8 @@ const _states = {
590615
},
591616
name: () => {
592617
return 'READY'
593-
}
618+
},
619+
pull: streamObserver => streamObserver._more()
594620
},
595621
STREAMING: {
596622
onSuccess: (streamObserver, meta) => {
@@ -605,20 +631,23 @@ const _states = {
605631
},
606632
name: () => {
607633
return 'STREAMING'
608-
}
634+
},
635+
pull: () => {}
609636
},
610637
FAILED: {
611638
onError: error => {
612639
// more errors are ignored
613640
},
614641
name: () => {
615642
return 'FAILED'
616-
}
643+
},
644+
pull: () => {}
617645
},
618646
SUCCEEDED: {
619647
name: () => {
620648
return 'SUCCEEDED'
621-
}
649+
},
650+
pull: () => {}
622651
}
623652
}
624653

packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,4 +338,26 @@ describe('#unit BoltProtocolV1', () => {
338338
}
339339
)
340340
})
341+
342+
describe('watermarks', () => {
343+
it('.run() should configure watermarks', () => {
344+
const recorder = new utils.MessageRecordingConnection()
345+
const protocol = utils.spyProtocolWrite(
346+
new BoltProtocolV1(recorder, null, false)
347+
)
348+
349+
const query = 'RETURN $x, $y'
350+
const parameters = { x: 'x', y: 'y' }
351+
const observer = protocol.run(query, parameters, {
352+
bookmark: Bookmark.empty(),
353+
txConfig: TxConfig.empty(),
354+
mode: WRITE,
355+
lowRecordWatermark: 100,
356+
highRecordWatermark: 200,
357+
})
358+
359+
expect(observer._lowRecordWatermark).toEqual(100)
360+
expect(observer._highRecordWatermark).toEqual(200)
361+
})
362+
})
341363
})

packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,23 @@ describe('#unit BoltProtocolV2', () => {
9292
})
9393
})
9494
})
95+
96+
describe('watermarks', () => {
97+
it('.run() should configure watermarks', () => {
98+
const recorder = new utils.MessageRecordingConnection()
99+
const protocol = utils.spyProtocolWrite(
100+
new BoltProtocolV2(recorder, null, false)
101+
)
102+
103+
const query = 'RETURN $x, $y'
104+
const parameters = { x: 'x', y: 'y' }
105+
const observer = protocol.run(query, parameters, {
106+
lowRecordWatermark: 100,
107+
highRecordWatermark: 200,
108+
})
109+
110+
expect(observer._lowRecordWatermark).toEqual(100)
111+
expect(observer._highRecordWatermark).toEqual(200)
112+
})
113+
})
95114
})

packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,27 @@ describe('#unit BoltProtocolV3', () => {
295295
}
296296
)
297297
})
298+
299+
describe('watermarks', () => {
300+
it('.run() should configure watermarks', () => {
301+
const recorder = new utils.MessageRecordingConnection()
302+
const protocol = utils.spyProtocolWrite(
303+
new BoltProtocolV3(recorder, null, false)
304+
)
305+
306+
const query = 'RETURN $x, $y'
307+
const parameters = { x: 'x', y: 'y' }
308+
const observer = protocol.run(query, parameters, {
309+
bookmark: Bookmark.empty(),
310+
txConfig: TxConfig.empty(),
311+
lowRecordWatermark: 100,
312+
highRecordWatermark: 200,
313+
})
314+
315+
expect(observer._lowRecordWatermark).toEqual(100)
316+
expect(observer._highRecordWatermark).toEqual(200)
317+
})
318+
})
298319
})
299320

300321
class SpiedBoltProtocolV3 extends BoltProtocolV3 {

packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,27 @@ describe('#unit BoltProtocolV4x0', () => {
214214
}
215215
)
216216
})
217+
218+
describe('watermarks', () => {
219+
it('.run() should configure watermarks', () => {
220+
const recorder = new utils.MessageRecordingConnection()
221+
const protocol = utils.spyProtocolWrite(
222+
new BoltProtocolV4x0(recorder, null, false)
223+
)
224+
225+
const query = 'RETURN $x, $y'
226+
const parameters = { x: 'x', y: 'y' }
227+
const observer = protocol.run(query, parameters, {
228+
bookmark: Bookmark.empty(),
229+
txConfig: TxConfig.empty(),
230+
lowRecordWatermark: 100,
231+
highRecordWatermark: 200,
232+
})
233+
234+
expect(observer._lowRecordWatermark).toEqual(100)
235+
expect(observer._highRecordWatermark).toEqual(200)
236+
})
237+
})
217238
})
218239

219240
class SpiedBoltProtocolV4x0 extends BoltProtocolV4x0 {

packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919

2020
import BoltProtocolV4x1 from '../../src/bolt/bolt-protocol-v4x1'
2121
import utils from '../test-utils'
22+
import { internal } from 'neo4j-driver-core'
23+
24+
const {
25+
txConfig: { TxConfig },
26+
bookmark: { Bookmark }
27+
} = internal
2228

2329
describe('#unit BoltProtocolV4x1', () => {
2430
describe('Bolt v4.4', () => {
@@ -82,4 +88,25 @@ describe('#unit BoltProtocolV4x1', () => {
8288
}
8389
)
8490
})
91+
92+
describe('watermarks', () => {
93+
it('.run() should configure watermarks', () => {
94+
const recorder = new utils.MessageRecordingConnection()
95+
const protocol = utils.spyProtocolWrite(
96+
new BoltProtocolV4x1(recorder, null, false)
97+
)
98+
99+
const query = 'RETURN $x, $y'
100+
const parameters = { x: 'x', y: 'y' }
101+
const observer = protocol.run(query, parameters, {
102+
bookmark: Bookmark.empty(),
103+
txConfig: TxConfig.empty(),
104+
lowRecordWatermark: 100,
105+
highRecordWatermark: 200,
106+
})
107+
108+
expect(observer._lowRecordWatermark).toEqual(100)
109+
expect(observer._highRecordWatermark).toEqual(200)
110+
})
111+
})
85112
})

0 commit comments

Comments
 (0)