diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 302f7d56c..062f2f079 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -115,6 +115,7 @@ class Result implements Promise { private _connectionHolder: connectionHolder.ConnectionHolder private _keys: string[] | null private _summary: ResultSummary | null + private _error: Error | null private _watermarks: { high: number; low: number } /** @@ -141,6 +142,7 @@ class Result implements Promise { this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER this._keys = null this._summary = null + this._error = null this._watermarks = watermarks } @@ -154,8 +156,10 @@ class Result implements Promise { } */ keys(): Promise { - if (this._keys != null) { + if (this._keys !== null) { return Promise.resolve(this._keys) + } else if (this._error !== null) { + return Promise.reject(this._error) } return new Promise((resolve, reject) => { this._streamObserverPromise @@ -179,8 +183,10 @@ class Result implements Promise { * */ summary(): Promise { - if (this._summary != null) { + if (this._summary !== null) { return Promise.resolve(this._summary) + } else if (this._error !== null) { + return Promise.reject(this._error) } return new Promise((resolve, reject) => { this._streamObserverPromise @@ -405,6 +411,7 @@ class Result implements Promise { // and result can't bee consumed any further; call the original onError callback after that this._connectionHolder.releaseConnection().then(() => { replaceStacktrace(error, this._stack) + this._error = error onErrorOriginal.call(observer, error) }) } @@ -431,7 +438,9 @@ class Result implements Promise { * @returns {void} */ _cancel(): void { - this._streamObserverPromise.then(o => o.cancel()) + if (this._summary === null && this._error === null) { + this._streamObserverPromise.then(o => o.cancel()) + } } /** diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 3c7598a40..bd7ad4424 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -64,7 +64,7 @@ class Session { private _databaseNameResolved: boolean private _lowRecordWatermark: number private _highRecordWatermark: number - + private _results: Result[] /** * @constructor * @protected @@ -128,6 +128,7 @@ class Session { const calculatedWatermaks = this._calculateWatermaks() this._lowRecordWatermark = calculatedWatermaks.low this._highRecordWatermark = calculatedWatermaks.high + this._results = [] } /** @@ -154,7 +155,7 @@ class Session { ? new TxConfig(transactionConfig) : TxConfig.empty() - return this._run(validatedQuery, params, connection => { + const result = this._run(validatedQuery, params, connection => { this._assertSessionIsOpen() return (connection as Connection).protocol().run(validatedQuery, params, { bookmarks: this._lastBookmarks, @@ -169,6 +170,8 @@ class Session { highRecordWatermark: this._highRecordWatermark }) }) + this._results.push(result) + return result } _run( @@ -415,6 +418,9 @@ class Session { async close(): Promise { if (this._open) { this._open = false + + this._results.forEach(result => result._cancel()) + this._transactionExecutor.close() await this._readConnectionHolder.close() diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 674734394..ed8a8e70a 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -59,6 +59,26 @@ describe('Result', () => { expect(keys).toBe(expectedKeys) }) + it('should reject pre-existing errors', () => { + const expectedError = newError('some error') + streamObserverMock.onError(expectedError) + + expect(result.keys()).rejects.toBe(expectedError) + }) + + it('should reject already consumed pre-existing error', async () => { + const expectedError = newError('some error') + streamObserverMock.onError(expectedError) + + try { + await result + } catch (e) { + // ignore + } + + expect(result.keys()).rejects.toBe(expectedError) + }) + it('should resolve key pushed afterwards', done => { const expectedKeys = ['a', 'c'] @@ -154,6 +174,26 @@ describe('Result', () => { expect(summary).toEqual(expectedSummary) }) + it('should reject a pre-existing error', () => { + const expectedError = newError('the expected error') + streamObserverMock.onError(expectedError) + + expect(result.summary()).rejects.toThrow(expectedError) + }) + + it('should reject already consumed pre-existing error', async () => { + const expectedError = newError('the expected error') + streamObserverMock.onError(expectedError) + + try { + await result + } catch (_) { + // ignore + } + + expect(result.summary()).rejects.toThrow(expectedError) + }) + it('should resolve summary pushe afterwards', done => { const metadata = { resultConsumedAfter: 20, diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index bf49bba1b..3976dfee1 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -204,6 +204,38 @@ describe('session', () => { }) }, 70000) + it('should call cancel current result', done => { + const session = newSessionWithConnection(newFakeConnection()) + + const result = session.run('RETURN 1') + const spiedCancel = jest.spyOn(result, '_cancel') + + session.close() + .finally(() => { + expect(spiedCancel).toHaveBeenCalled() + done() + }) + }) + + it('should call cancel current results', done => { + const session = newSessionWithConnection(newFakeConnection()) + + const spiedCancels: any[] = [] + + for (let i = 0; i < 10; i++) { + const result = session.run('RETURN 1') + spiedCancels.push(jest.spyOn(result, '_cancel')) + } + + session.close() + .finally(() => { + spiedCancels.forEach(spy => { + expect(spy).toHaveBeenCalled() + }) + done() + }) + }) + describe('.lastBookmark()', () => { it.each([ [bookmarks.Bookmarks.empty()], diff --git a/packages/neo4j-driver/test/session.test.js b/packages/neo4j-driver/test/session.test.js index 13c320c4d..920c3ec32 100644 --- a/packages/neo4j-driver/test/session.test.js +++ b/packages/neo4j-driver/test/session.test.js @@ -341,7 +341,7 @@ describe('#integration session', () => { 'Query failed after a long running query was terminated', error ) - done.fail.bind(done) + done.fail(error) }) })