Skip to content

Commit 70bc926

Browse files
committed
Add tests to ResultStreamObserver.pull() and change StreamObserver.setPullMode to setExplicityPull
1 parent b3ce989 commit 70bc926

File tree

5 files changed

+260
-12
lines changed

5 files changed

+260
-12
lines changed

packages/bolt-connection/src/bolt/stream-observers.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,12 @@ class ResultStreamObserver extends StreamObserver {
9999
this._lowRecordWatermark = lowRecordWatermark
100100
this._highRecordWatermark = highRecordWatermark
101101
this._setState(reactive ? _states.READY : _states.READY_STREAMING)
102-
this._setupAuoPull()
103-
this._pullMode = false;
102+
this._setupAutoPull()
103+
this._explicityPull = false;
104104
}
105105

106-
setPullMode(pullMode) {
107-
this._pullMode = pullMode;
106+
setExplicityPull(explicityPull) {
107+
this._explicityPull = explicityPull;
108108
}
109109

110110
pull() {
@@ -355,7 +355,7 @@ class ResultStreamObserver extends StreamObserver {
355355

356356
_handleStreaming () {
357357
if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) {
358-
if (!this._pullMode && (this._discard || this._autoPull)) {
358+
if (!this._explicityPull && (this._discard || this._autoPull)) {
359359
this._more()
360360
}
361361
}
@@ -385,7 +385,7 @@ class ResultStreamObserver extends StreamObserver {
385385
this._state = state
386386
}
387387

388-
_setupAuoPull () {
388+
_setupAutoPull () {
389389
this._autoPull = true
390390
}
391391
}

packages/bolt-connection/test/bolt/stream-observer.test.js

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,254 @@ describe('#unit ResultStreamObserver', () => {
198198
}
199199
})
200200
})
201+
202+
describe('when is not explicity pull (default)', () => {
203+
it('should ask for more records when the stream is completed and has more', () => {
204+
// Setup
205+
const queryId = 123
206+
const fetchSize = 2000
207+
208+
const more = jest.fn()
209+
const streamObserver = new ResultStreamObserver({
210+
moreFunction: more,
211+
fetchSize: 2000
212+
})
213+
214+
// action
215+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
216+
217+
streamObserver.subscribe(newObserver())
218+
219+
streamObserver.onNext([1, 2, 3])
220+
streamObserver.onNext([11, 22, 33])
221+
streamObserver.onCompleted({ has_more: true })
222+
223+
streamObserver.onNext([111, 222, 333])
224+
streamObserver.onCompleted({ has_more: false })
225+
226+
// verification
227+
expect(more).toBeCalledTimes(1)
228+
expect(more).toBeCalledWith(queryId, fetchSize, streamObserver)
229+
})
230+
})
231+
232+
describe('when is explicity pull enabled', () => {
233+
it('should not ask for more records when the stream is completed and has more', () => {
234+
// Setup
235+
const queryId = 123
236+
237+
const more = jest.fn()
238+
const streamObserver = new ResultStreamObserver({
239+
moreFunction: more,
240+
fetchSize: 2000
241+
})
242+
streamObserver.setExplicityPull(true)
243+
244+
// action
245+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
246+
247+
streamObserver.subscribe(newObserver())
248+
249+
streamObserver.onNext([1, 2, 3])
250+
streamObserver.onNext([11, 22, 33])
251+
streamObserver.onCompleted({ has_more: true })
252+
253+
// verification
254+
expect(more).toBeCalledTimes(0)
255+
})
256+
257+
describe('pull()', () => {
258+
it('should ask for more records when the stream is completed and has more', () => {
259+
// Setup
260+
const queryId = 123
261+
const fetchSize = 2000
262+
263+
const more = jest.fn()
264+
const streamObserver = new ResultStreamObserver({
265+
moreFunction: more,
266+
fetchSize: fetchSize
267+
})
268+
streamObserver.setExplicityPull(true)
269+
270+
// Scenario
271+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
272+
273+
streamObserver.subscribe(newObserver())
274+
275+
streamObserver.onNext([1, 2, 3])
276+
streamObserver.onNext([11, 22, 33])
277+
streamObserver.onCompleted({ has_more: true })
278+
279+
// Action
280+
streamObserver.pull()
281+
282+
// verification
283+
expect(more).toBeCalledTimes(1)
284+
expect(more).toBeCalledWith(queryId, fetchSize, streamObserver)
285+
})
286+
287+
it('should ask for more records when the stream is a new reactive stream', () => {
288+
// Setup
289+
const queryId = 123
290+
const fetchSize = 2000
291+
292+
const more = jest.fn()
293+
const streamObserver = new ResultStreamObserver({
294+
moreFunction: more,
295+
fetchSize: fetchSize,
296+
reactive: true
297+
})
298+
streamObserver.setExplicityPull(true)
299+
300+
// Scenario
301+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
302+
303+
// Action
304+
streamObserver.pull()
305+
306+
// verification
307+
expect(more).toBeCalledTimes(1)
308+
expect(more).toBeCalledWith(queryId, fetchSize, streamObserver)
309+
})
310+
311+
it('should not ask for more records when the stream is a new stream', () => {
312+
// Setup
313+
const queryId = 123
314+
const fetchSize = 2000
315+
316+
const more = jest.fn()
317+
const streamObserver = new ResultStreamObserver({
318+
moreFunction: more,
319+
fetchSize: fetchSize,
320+
reactive: false
321+
})
322+
streamObserver.setExplicityPull(true)
323+
324+
// Scenario
325+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
326+
327+
// Action
328+
streamObserver.pull()
329+
330+
// verification
331+
expect(more).toBeCalledTimes(0)
332+
})
333+
334+
335+
it('should not ask for more records when the stream is a new stream', () => {
336+
// Setup
337+
const queryId = 123
338+
const fetchSize = 2000
339+
340+
const more = jest.fn()
341+
const streamObserver = new ResultStreamObserver({
342+
moreFunction: more,
343+
fetchSize: fetchSize
344+
})
345+
346+
streamObserver.setExplicityPull(true)
347+
348+
// Scenario
349+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
350+
351+
// Action
352+
streamObserver.pull()
353+
354+
// verification
355+
expect(more).toBeCalledTimes(0)
356+
})
357+
358+
it('should not ask for more records when it is streaming', () => {
359+
// Setup
360+
const queryId = 123
361+
const fetchSize = 2000
362+
363+
const more = jest.fn()
364+
const streamObserver = new ResultStreamObserver({
365+
moreFunction: more,
366+
fetchSize: fetchSize
367+
})
368+
369+
streamObserver.setExplicityPull(true)
370+
371+
// Scenario
372+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
373+
374+
streamObserver.subscribe(newObserver())
375+
376+
streamObserver.onNext([1, 2, 3])
377+
streamObserver.onNext([11, 22, 33])
378+
streamObserver.onCompleted({ has_more: true })
379+
380+
streamObserver.pull() // should actual call
381+
382+
streamObserver.onNext([111, 222, 333])
383+
384+
// Action
385+
streamObserver.pull()
386+
387+
// verification
388+
expect(more).toBeCalledTimes(1)
389+
})
390+
391+
it('should not ask for more records when result is completed', () => {
392+
// Setup
393+
const queryId = 123
394+
const fetchSize = 2000
395+
396+
const more = jest.fn()
397+
const streamObserver = new ResultStreamObserver({
398+
moreFunction: more,
399+
fetchSize: fetchSize
400+
})
401+
402+
streamObserver.setExplicityPull(true)
403+
404+
// Scenario
405+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
406+
407+
streamObserver.subscribe(newObserver())
408+
409+
streamObserver.onNext([1, 2, 3])
410+
streamObserver.onNext([11, 22, 33])
411+
streamObserver.onCompleted({ has_more: false })
412+
413+
// Action
414+
streamObserver.pull()
415+
416+
// verification
417+
expect(more).toBeCalledTimes(0)
418+
})
419+
420+
it('should not ask for more records when stream failed', () => {
421+
// Setup
422+
const queryId = 123
423+
const fetchSize = 2000
424+
425+
const more = jest.fn()
426+
const streamObserver = new ResultStreamObserver({
427+
moreFunction: more,
428+
fetchSize: fetchSize
429+
})
430+
431+
streamObserver.setExplicityPull(true)
432+
433+
// Scenario
434+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
435+
436+
streamObserver.subscribe(newObserver())
437+
438+
streamObserver.onNext([1, 2, 3])
439+
streamObserver.onError(new Error('error'))
440+
441+
// Action
442+
streamObserver.pull()
443+
444+
// verification
445+
expect(more).toBeCalledTimes(0)
446+
})
447+
})
448+
})
201449
})
202450

203451
describe('#unit RouteObserver', () => {

packages/core/src/internal/observers.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ export interface ResultStreamObserver extends StreamObserver {
8989
*/
9090
prepareToHandleSingleResponse(): void
9191

92-
setPullMode(pullMode: boolean): void
92+
setExplicityPull(explicityPull: boolean): void
9393

9494
pull(): void
9595

@@ -127,7 +127,7 @@ export class CompletedObserver implements ResultStreamObserver {
127127
// do nothing
128128
}
129129

130-
setPullMode(_: boolean): void {
130+
setExplicityPull(_: boolean): void {
131131
// do nothing
132132
}
133133

@@ -182,7 +182,7 @@ export class FailedObserver implements ResultStreamObserver {
182182
// do nothing
183183
}
184184

185-
setPullMode(_: boolean): void {
185+
setExplicityPull(_: boolean): void {
186186
// do nothing
187187
}
188188

packages/core/src/result.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,12 +336,12 @@ class Result implements Promise<QueryResult> {
336336
.catch(() => {})
337337
}
338338

339-
_subscribe(observer: ResultObserver, pullMode: boolean = false): Promise<observer.ResultStreamObserver> {
339+
_subscribe(observer: ResultObserver, explicityPull: boolean = false): Promise<observer.ResultStreamObserver> {
340340
const _observer = this._decorateObserver(observer)
341341

342342
return this._streamObserverPromise
343343
.then(o => {
344-
o.setPullMode(pullMode)
344+
o.setExplicityPull(explicityPull)
345345
o.subscribe(_observer)
346346
return o
347347
})

packages/core/test/result.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,7 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver {
766766
.forEach(o => o.onCompleted!(meta))
767767
}
768768

769-
setPullMode(_: boolean): void {
769+
setExplicityPull(_: boolean): void {
770770
// do nothing
771771
}
772772

0 commit comments

Comments
 (0)