diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index 518281dd4..0016f3e9b 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -202,10 +202,6 @@ class ResultStreamObserver extends StreamObserver { * @param {function(error: Object)} observer.onError - Handle errors, should always be provided. */ subscribe (observer) { - if (this._error) { - observer.onError(this._error) - return - } if (this._head && observer.onKeys) { observer.onKeys(this._head) } @@ -223,6 +219,9 @@ class ResultStreamObserver extends StreamObserver { if (this._tail && observer.onCompleted) { observer.onCompleted(this._tail) } + if (this._error) { + observer.onError(this._error) + } this._observers.push(observer) if (this._state === _states.READY) { diff --git a/packages/bolt-connection/test/bolt/stream-observer.test.js b/packages/bolt-connection/test/bolt/stream-observer.test.js index 13eeb24a6..e637c15fa 100644 --- a/packages/bolt-connection/test/bolt/stream-observer.test.js +++ b/packages/bolt-connection/test/bolt/stream-observer.test.js @@ -199,6 +199,130 @@ describe('#unit ResultStreamObserver', () => { }) }) + it('should inform all the pre-existing events of a success stream to the subscriber', () => { + const streamObserver = new ResultStreamObserver() + const received = { + onCompleted: [], + onError: [], + onNext: [], + onKeys: [] + } + const observer = { + onCompleted: metadata => received.onCompleted.push(metadata), + onError: error => received.onError.push(error), + onNext: record => received.onNext.push(record), + onKeys: keys => received.onKeys.push(keys) + } + + streamObserver.onCompleted({ fields: ['A', 'B', 'C'] }) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onNext([111, 222, 333]) + + streamObserver.onCompleted({ key: 42, has_more: false }) + + streamObserver.subscribe(observer) + + expect(received.onNext.length).toEqual(3) + expect(received.onNext[0].toObject()).toEqual({ A: 1, B: 2, C: 3 }) + expect(received.onNext[1].toObject()).toEqual({ A: 11, B: 22, C: 33 }) + expect(received.onNext[2].toObject()).toEqual({ A: 111, B: 222, C: 333 }) + expect(received.onKeys).toEqual([['A', 'B', 'C']]) + expect(received.onCompleted).toEqual([{ key: 42, has_more: false }]) + expect(received.onError).toEqual([]) + }) + + it('should inform all the pre-existing events of a success stream to the subscriber in the correct order', () => { + const streamObserver = new ResultStreamObserver() + const received = [] + const observer = { + onCompleted: metadata => received.push(metadata), + onError: error => received.push(error), + onNext: record => received.push(record), + onKeys: keys => received.push(keys) + } + + streamObserver.onCompleted({ fields: ['A', 'B', 'C'] }) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onNext([111, 222, 333]) + + streamObserver.onCompleted({ key: 42, has_more: false }) + + streamObserver.subscribe(observer) + + expect(received.length).toEqual(5) + expect(received[0]).toEqual(['A', 'B', 'C']) + expect(received[1].toObject()).toEqual({ A: 1, B: 2, C: 3 }) + expect(received[2].toObject()).toEqual({ A: 11, B: 22, C: 33 }) + expect(received[3].toObject()).toEqual({ A: 111, B: 222, C: 333 }) + expect(received[4]).toEqual({ key: 42, has_more: false }) + }) + + it('should inform all the pre-existing events of an error stream to the subscriber', () => { + const streamObserver = new ResultStreamObserver() + const received = { + onCompleted: [], + onError: [], + onNext: [], + onKeys: [] + } + const observer = { + onCompleted: metadata => received.onCompleted.push(metadata), + onError: error => received.onError.push(error), + onNext: record => received.onNext.push(record), + onKeys: keys => received.onKeys.push(keys) + } + + streamObserver.onCompleted({ fields: ['A', 'B', 'C'] }) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onNext([111, 222, 333]) + + streamObserver.onError(newError('something is on the way')) + + streamObserver.subscribe(observer) + + expect(received.onNext.length).toEqual(3) + expect(received.onNext[0].toObject()).toEqual({ A: 1, B: 2, C: 3 }) + expect(received.onNext[1].toObject()).toEqual({ A: 11, B: 22, C: 33 }) + expect(received.onNext[2].toObject()).toEqual({ A: 111, B: 222, C: 333 }) + expect(received.onKeys).toEqual([['A', 'B', 'C']]) + expect(received.onCompleted).toEqual([]) + expect(received.onError).toEqual([newError('something is on the way')]) + }) + + it('should inform all the pre-existing events of an error stream stream to the subscriber in the correct order', () => { + const streamObserver = new ResultStreamObserver() + const received = [] + const observer = { + onCompleted: metadata => received.push(metadata), + onError: error => received.push(error), + onNext: record => received.push(record), + onKeys: keys => received.push(keys) + } + + streamObserver.onCompleted({ fields: ['A', 'B', 'C'] }) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onNext([111, 222, 333]) + + streamObserver.onError(newError('something is on the way')) + + streamObserver.subscribe(observer) + + expect(received.length).toEqual(5) + expect(received[0]).toEqual(['A', 'B', 'C']) + expect(received[1].toObject()).toEqual({ A: 1, B: 2, C: 3 }) + expect(received[2].toObject()).toEqual({ A: 11, B: 22, C: 33 }) + expect(received[3].toObject()).toEqual({ A: 111, B: 222, C: 333 }) + expect(received[4]).toEqual(newError('something is on the way')) + }) + describe('when is not paused (default)', () => { it('should ask for more records when the stream is completed and has more', () => { // Setup