Skip to content

Commit ba89fe3

Browse files
committed
Changing pull for pause and resume
1 parent adf9ac3 commit ba89fe3

File tree

6 files changed

+188
-84
lines changed

6 files changed

+188
-84
lines changed

packages/bolt-connection/src/bolt/stream-observers.js

+19-6
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,28 @@ class ResultStreamObserver extends StreamObserver {
100100
this._highRecordWatermark = highRecordWatermark
101101
this._setState(reactive ? _states.READY : _states.READY_STREAMING)
102102
this._setupAutoPull()
103-
this._explicityPull = false;
103+
this._paused = false;
104104
}
105105

106-
setExplicityPull(explicityPull) {
107-
this._explicityPull = explicityPull;
106+
/**
107+
* Pause the record consuming
108+
*
109+
* This function will supend the record consuming. It will not cancel the stream and the already
110+
* requested records will be sent to the subscriber.
111+
*/
112+
pause () {
113+
this._paused = true
108114
}
109115

110-
pull() {
111-
return this._state.pull(this)
116+
/**
117+
* Resume the record consuming
118+
*
119+
* This function will resume the record consuming fetching more records from the server.
120+
*/
121+
resume () {
122+
this._paused = false
123+
this._setupAutoPull(true)
124+
this._state.pull(this)
112125
}
113126

114127
/**
@@ -355,7 +368,7 @@ class ResultStreamObserver extends StreamObserver {
355368

356369
_handleStreaming () {
357370
if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) {
358-
if (!this._explicityPull && (this._discard || this._autoPull)) {
371+
if (!this._paused && (this._discard || this._autoPull)) {
359372
this._more()
360373
}
361374
}

packages/bolt-connection/test/bolt/stream-observer.test.js

+64-21
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ describe('#unit ResultStreamObserver', () => {
199199
})
200200
})
201201

202-
describe('when is not explicity pull (default)', () => {
202+
describe('when is not paused (default)', () => {
203203
it('should ask for more records when the stream is completed and has more', () => {
204204
// Setup
205205
const queryId = 123
@@ -229,7 +229,7 @@ describe('#unit ResultStreamObserver', () => {
229229
})
230230
})
231231

232-
describe('when is explicity pull enabled', () => {
232+
describe('when is paused', () => {
233233
it('should not ask for more records when the stream is completed and has more', () => {
234234
// Setup
235235
const queryId = 123
@@ -239,7 +239,8 @@ describe('#unit ResultStreamObserver', () => {
239239
moreFunction: more,
240240
fetchSize: 2000
241241
})
242-
streamObserver.setExplicityPull(true)
242+
243+
streamObserver.pause()
243244

244245
// action
245246
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
@@ -254,7 +255,7 @@ describe('#unit ResultStreamObserver', () => {
254255
expect(more).toBeCalledTimes(0)
255256
})
256257

257-
describe('pull()', () => {
258+
describe('resume()', () => {
258259
it('should ask for more records when the stream is completed and has more', () => {
259260
// Setup
260261
const queryId = 123
@@ -265,7 +266,8 @@ describe('#unit ResultStreamObserver', () => {
265266
moreFunction: more,
266267
fetchSize: fetchSize
267268
})
268-
streamObserver.setExplicityPull(true)
269+
270+
streamObserver.pause()
269271

270272
// Scenario
271273
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
@@ -277,7 +279,7 @@ describe('#unit ResultStreamObserver', () => {
277279
streamObserver.onCompleted({ has_more: true })
278280

279281
// Action
280-
streamObserver.pull()
282+
streamObserver.resume()
281283

282284
// verification
283285
expect(more).toBeCalledTimes(1)
@@ -295,13 +297,13 @@ describe('#unit ResultStreamObserver', () => {
295297
fetchSize: fetchSize,
296298
reactive: true
297299
})
298-
streamObserver.setExplicityPull(true)
300+
streamObserver.pause()
299301

300302
// Scenario
301303
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
302304

303305
// Action
304-
streamObserver.pull()
306+
streamObserver.resume()
305307

306308
// verification
307309
expect(more).toBeCalledTimes(1)
@@ -320,10 +322,10 @@ describe('#unit ResultStreamObserver', () => {
320322
fetchSize: fetchSize,
321323
reactive: true
322324
})
323-
streamObserver.setExplicityPull(true)
325+
streamObserver.pause()
324326

325327
// Action
326-
streamObserver.pull()
328+
streamObserver.resume()
327329

328330
// verification
329331
expect(more).toBeCalledTimes(1)
@@ -341,13 +343,13 @@ describe('#unit ResultStreamObserver', () => {
341343
fetchSize: fetchSize,
342344
reactive: false
343345
})
344-
streamObserver.setExplicityPull(true)
346+
streamObserver.pause()
345347

346348
// Scenario
347349
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
348350

349351
// Action
350-
streamObserver.pull()
352+
streamObserver.resume()
351353

352354
// verification
353355
expect(more).toBeCalledTimes(0)
@@ -365,13 +367,13 @@ describe('#unit ResultStreamObserver', () => {
365367
fetchSize: fetchSize
366368
})
367369

368-
streamObserver.setExplicityPull(true)
370+
streamObserver.pause()
369371

370372
// Scenario
371373
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
372374

373375
// Action
374-
streamObserver.pull()
376+
streamObserver.resume()
375377

376378
// verification
377379
expect(more).toBeCalledTimes(0)
@@ -388,7 +390,7 @@ describe('#unit ResultStreamObserver', () => {
388390
fetchSize: fetchSize
389391
})
390392

391-
streamObserver.setExplicityPull(true)
393+
streamObserver.pause()
392394

393395
// Scenario
394396
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
@@ -399,12 +401,12 @@ describe('#unit ResultStreamObserver', () => {
399401
streamObserver.onNext([11, 22, 33])
400402
streamObserver.onCompleted({ has_more: true })
401403

402-
streamObserver.pull() // should actual call
404+
streamObserver.resume() // should actual call
403405

404406
streamObserver.onNext([111, 222, 333])
405407

406408
// Action
407-
streamObserver.pull()
409+
streamObserver.resume()
408410

409411
// verification
410412
expect(more).toBeCalledTimes(1)
@@ -421,7 +423,7 @@ describe('#unit ResultStreamObserver', () => {
421423
fetchSize: fetchSize
422424
})
423425

424-
streamObserver.setExplicityPull(true)
426+
streamObserver.pause()
425427

426428
// Scenario
427429
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
@@ -433,12 +435,53 @@ describe('#unit ResultStreamObserver', () => {
433435
streamObserver.onCompleted({ has_more: false })
434436

435437
// Action
436-
streamObserver.pull()
438+
streamObserver.resume()
437439

438440
// verification
439441
expect(more).toBeCalledTimes(0)
440442
})
441443

444+
445+
it('should resume the stream consumption until the end', () => {
446+
// Setup
447+
const queryId = 123
448+
const fetchSize = 2000
449+
450+
const more = jest.fn()
451+
const streamObserver = new ResultStreamObserver({
452+
moreFunction: more,
453+
fetchSize: fetchSize
454+
})
455+
456+
streamObserver.pause()
457+
458+
// Scenario
459+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
460+
461+
streamObserver.subscribe(newObserver())
462+
463+
streamObserver.onNext([1, 2, 3])
464+
streamObserver.onNext([11, 22, 33])
465+
streamObserver.onCompleted({ has_more: true })
466+
467+
// Action
468+
streamObserver.resume()
469+
470+
// Streaming until the end
471+
streamObserver.onNext([1, 2, 3])
472+
streamObserver.onNext([11, 22, 33])
473+
streamObserver.onCompleted({ has_more: true })
474+
streamObserver.onNext([1, 2, 3])
475+
streamObserver.onNext([11, 22, 33])
476+
streamObserver.onCompleted({ has_more: true })
477+
streamObserver.onNext([1, 2, 3])
478+
streamObserver.onNext([11, 22, 33])
479+
streamObserver.onCompleted({ has_more: false })
480+
481+
// verification
482+
expect(more).toBeCalledTimes(3)
483+
})
484+
442485
it('should not ask for more records when stream failed', () => {
443486
// Setup
444487
const queryId = 123
@@ -450,7 +493,7 @@ describe('#unit ResultStreamObserver', () => {
450493
fetchSize: fetchSize
451494
})
452495

453-
streamObserver.setExplicityPull(true)
496+
streamObserver.pause()
454497

455498
// Scenario
456499
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
@@ -461,7 +504,7 @@ describe('#unit ResultStreamObserver', () => {
461504
streamObserver.onError(new Error('error'))
462505

463506
// Action
464-
streamObserver.pull()
507+
streamObserver.resume()
465508

466509
// verification
467510
expect(more).toBeCalledTimes(0)

packages/core/src/internal/observers.ts

+24-14
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,22 @@ export interface ResultStreamObserver extends StreamObserver {
7777
* Cancel pending record stream
7878
*/
7979
cancel(): void
80+
81+
/**
82+
* Pause the record consuming
83+
*
84+
* This function will supend the record consuming. It will not cancel the stream and the already
85+
* requested records will be sent to the subscriber.
86+
*/
87+
pause(): void
88+
89+
/**
90+
* Resume the record consuming
91+
*
92+
* This function will resume the record consuming fetching more records from the server.
93+
*/
94+
resume(): void
95+
8096
/**
8197
* Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL.
8298
* Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream.
@@ -89,10 +105,6 @@ export interface ResultStreamObserver extends StreamObserver {
89105
*/
90106
prepareToHandleSingleResponse(): void
91107

92-
setExplicityPull(explicityPull: boolean): void
93-
94-
pull(): boolean
95-
96108
/**
97109
* Mark this observer as if it has completed with no metadata.
98110
*/
@@ -119,21 +131,20 @@ export class CompletedObserver implements ResultStreamObserver {
119131
// do nothing
120132
}
121133

122-
prepareToHandleSingleResponse(): void {
134+
pause(): void {
123135
// do nothing
124136
}
125137

126-
markCompleted(): void {
138+
resume(): void {
127139
// do nothing
128140
}
129141

130-
setExplicityPull(_: boolean): void {
142+
prepareToHandleSingleResponse(): void {
131143
// do nothing
132144
}
133145

134-
pull(): boolean {
146+
markCompleted(): void {
135147
// do nothing
136-
return false
137148
}
138149

139150
onError(error: Error): void {
@@ -175,21 +186,20 @@ export class FailedObserver implements ResultStreamObserver {
175186
// do nothing
176187
}
177188

178-
prepareToHandleSingleResponse(): void {
189+
pause(): void {
179190
// do nothing
180191
}
181192

182-
markCompleted(): void {
193+
resume(): void {
183194
// do nothing
184195
}
185196

186-
setExplicityPull(_: boolean): void {
197+
markCompleted(): void {
187198
// do nothing
188199
}
189200

190-
pull(): boolean {
201+
prepareToHandleSingleResponse(): void {
191202
// do nothing
192-
return false
193203
}
194204

195205
}

0 commit comments

Comments
 (0)