Skip to content

Commit adf9ac3

Browse files
committed
Emulating error in a unit test scenario
1 parent efc064d commit adf9ac3

File tree

4 files changed

+114
-20
lines changed

4 files changed

+114
-20
lines changed

packages/core/src/internal/observers.ts

+5-3
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ export interface ResultStreamObserver extends StreamObserver {
9191

9292
setExplicityPull(explicityPull: boolean): void
9393

94-
pull(): void
94+
pull(): boolean
9595

9696
/**
9797
* Mark this observer as if it has completed with no metadata.
@@ -131,8 +131,9 @@ export class CompletedObserver implements ResultStreamObserver {
131131
// do nothing
132132
}
133133

134-
pull(): void {
134+
pull(): boolean {
135135
// do nothing
136+
return false
136137
}
137138

138139
onError(error: Error): void {
@@ -186,8 +187,9 @@ export class FailedObserver implements ResultStreamObserver {
186187
// do nothing
187188
}
188189

189-
pull(): void {
190+
pull(): boolean {
190191
// do nothing
192+
return false
191193
}
192194

193195
}

packages/core/src/result.ts

+8-11
Original file line numberDiff line numberDiff line change
@@ -246,13 +246,9 @@ class Result implements Promise<QueryResult> {
246246

247247
const status = { paused: false }
248248

249-
let streaming: observer.ResultStreamObserver | null = null
250-
251-
try {
252-
streaming = await this._subscribe(queuedObserver, true)
253-
} catch (e) {
254-
// ignore, we will handle it in consume since the error is notifies in the onError callback
255-
}
249+
const streaming: observer.ResultStreamObserver | null =
250+
// the error will be send to the onError callback
251+
await this._subscribe(queuedObserver, true).catch(() => null)
256252

257253
const pullIfNeeded = () => {
258254
if (queuedObserver.size >= this._watermarks.high) {
@@ -466,15 +462,16 @@ class Result implements Promise<QueryResult> {
466462

467463
const observer = {
468464
_buffer: [createResolvablePromise()],
465+
_completedCalls: 0,
469466
onNext: (record: Record) => {
470-
observer._buffer[observer._buffer.length - 1].resolve({ done: false, record });
471-
observer._buffer.push(createResolvablePromise());
467+
observer._buffer[observer._buffer.length - 1].resolve({ done: false, record })
468+
observer._buffer.push(createResolvablePromise())
472469
},
473470
onCompleted: (summary: ResultSummary) => {
474-
observer._buffer[observer._buffer.length - 1].resolve({ done: true, summary });
471+
observer._buffer[observer._buffer.length - 1].resolve({ done: true, summary })
475472
},
476473
onError: (error: Error) => {
477-
observer._buffer[observer._buffer.length - 1].reject(error);
474+
observer._buffer[observer._buffer.length - 1].reject(error)
478475
},
479476
dequeue: async () => {
480477
const value = await observer._buffer[0].promise

packages/core/test/result.test.ts

+100-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import { observer, connectionHolder } from '../src/internal'
2020
import {
2121
Connection,
22+
internal,
2223
newError,
2324
Record,
2425
ResultObserver,
@@ -694,6 +695,56 @@ describe('Result', () => {
694695
])
695696
})
696697

698+
it('should end full batch', async () => {
699+
const fetchSize = 3
700+
const observer = new ResultStreamObserverMock()
701+
const res = new Result(
702+
Promise.resolve(observer),
703+
'query', undefined, undefined,
704+
{
705+
low: fetchSize * 0.3, // Same as calculate in the session.ts
706+
high: fetchSize * 0.7
707+
}
708+
)
709+
710+
const keys = ['a', 'b']
711+
const rawRecord1 = [1, 2]
712+
const rawRecord2 = [3, 4]
713+
const rawRecord3 = [5, 6]
714+
const rawRecord4 = [7, 8]
715+
const rawRecord5 = [9, 10]
716+
const rawRecord6 = [11, 12]
717+
const queue: any[][] = [
718+
rawRecord3,
719+
rawRecord4,
720+
rawRecord5,
721+
rawRecord6
722+
]
723+
724+
jest.spyOn(observer, 'pull')
725+
.mockImplementation(simulatePull(queue, observer, fetchSize, 2))
726+
727+
observer.onKeys(keys)
728+
observer.onNext(rawRecord1)
729+
observer.onNext(rawRecord2)
730+
731+
const records = []
732+
733+
for await (const record of res) {
734+
records.push(record)
735+
await new Promise(r => setTimeout(r, 0.1))
736+
}
737+
738+
expect(records).toEqual([
739+
new Record(keys, rawRecord1),
740+
new Record(keys, rawRecord2),
741+
new Record(keys, rawRecord3),
742+
new Record(keys, rawRecord4),
743+
new Record(keys, rawRecord5),
744+
new Record(keys, rawRecord6)
745+
])
746+
})
747+
697748
describe('onError', () => {
698749
it('should throws an exception while iterate over records', async () => {
699750
const keys = ['a', 'b']
@@ -898,14 +949,14 @@ describe('Result', () => {
898949
describe.each([
899950
[
900951
'Promise.resolve(new observer.FailedObserver({ error: expectedError }))',
901-
Promise.resolve(new observer.FailedObserver({ error: expectedError }))
952+
() => Promise.resolve(new observer.FailedObserver({ error: expectedError }))
902953
],
903-
['Promise.reject(expectedError)', Promise.reject(expectedError)]
904-
])('new Result(%s, "query") ', (_, promise) => {
954+
['Promise.reject(expectedError)', () => Promise.reject(expectedError)]
955+
])('new Result(%s, "query") ', (_, getPromise) => {
905956
let result: Result
906957

907958
beforeEach(() => {
908-
result = new Result(promise, 'query')
959+
result = new Result(getPromise(), 'query')
909960
})
910961

911962
describe('.keys()', () => {
@@ -1033,8 +1084,52 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver {
10331084
// do nothing
10341085
}
10351086

1036-
pull(): void {
1087+
pull(): boolean {
10371088
// do nothing
1089+
return true
1090+
}
1091+
}
1092+
1093+
function simulatePull(
1094+
records: any[][],
1095+
observer: ResultStreamObserverMock,
1096+
fetchSize: number,
1097+
timeout: number = 1): () => boolean {
1098+
const state = {
1099+
streaming: false,
1100+
finished: false,
1101+
consumed: 0
1102+
}
1103+
return () => {
1104+
1105+
if (state.streaming || state.finished) {
1106+
return false
1107+
}
1108+
state.streaming = true
1109+
state.consumed = 0
1110+
const interval = setInterval(() => {
1111+
state.streaming = state.consumed < fetchSize
1112+
state.finished = records.length === 0
1113+
1114+
if (state.finished) {
1115+
observer.onCompleted({})
1116+
clearInterval(interval)
1117+
return
1118+
}
1119+
1120+
if (!state.streaming) {
1121+
clearInterval(interval)
1122+
return
1123+
}
1124+
1125+
const record = records.shift()
1126+
if (record !== undefined) {
1127+
observer.onNext(record)
1128+
}
1129+
state.consumed++
1130+
1131+
}, timeout)
1132+
return true
10381133
}
10391134
}
10401135

packages/core/test/utils/connection.fake.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ function mockResultStreamObserver(query: string, parameters: any | undefined): R
174174
onNext: (result: any) => { },
175175
cancel: () => { },
176176
prepareToHandleSingleResponse: () => { },
177-
pull: () => { },
177+
pull: () => { return true },
178178
markCompleted: () => { },
179179
setExplicityPull: (_: boolean) => { },
180180
subscribe: (observer: ResultObserver) => {

0 commit comments

Comments
 (0)