Skip to content

Commit d614b1f

Browse files
authored
Discards active auto-commit results on Session.close() (#889)
Discarding the elements before reseting the Session allows non-consumed results to be committed before reset the result. This PR also improves the `keys()` and `summary()` methods by allowing get errors cached in the result.
1 parent 524d883 commit d614b1f

File tree

5 files changed

+93
-6
lines changed

5 files changed

+93
-6
lines changed

packages/core/src/result.ts

+12-3
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ class Result implements Promise<QueryResult> {
115115
private _connectionHolder: connectionHolder.ConnectionHolder
116116
private _keys: string[] | null
117117
private _summary: ResultSummary | null
118+
private _error: Error | null
118119
private _watermarks: { high: number; low: number }
119120

120121
/**
@@ -141,6 +142,7 @@ class Result implements Promise<QueryResult> {
141142
this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER
142143
this._keys = null
143144
this._summary = null
145+
this._error = null
144146
this._watermarks = watermarks
145147
}
146148

@@ -154,8 +156,10 @@ class Result implements Promise<QueryResult> {
154156
}
155157
*/
156158
keys(): Promise<string[]> {
157-
if (this._keys != null) {
159+
if (this._keys !== null) {
158160
return Promise.resolve(this._keys)
161+
} else if (this._error !== null) {
162+
return Promise.reject(this._error)
159163
}
160164
return new Promise((resolve, reject) => {
161165
this._streamObserverPromise
@@ -179,8 +183,10 @@ class Result implements Promise<QueryResult> {
179183
*
180184
*/
181185
summary(): Promise<ResultSummary> {
182-
if (this._summary != null) {
186+
if (this._summary !== null) {
183187
return Promise.resolve(this._summary)
188+
} else if (this._error !== null) {
189+
return Promise.reject(this._error)
184190
}
185191
return new Promise((resolve, reject) => {
186192
this._streamObserverPromise
@@ -405,6 +411,7 @@ class Result implements Promise<QueryResult> {
405411
// and result can't bee consumed any further; call the original onError callback after that
406412
this._connectionHolder.releaseConnection().then(() => {
407413
replaceStacktrace(error, this._stack)
414+
this._error = error
408415
onErrorOriginal.call(observer, error)
409416
})
410417
}
@@ -431,7 +438,9 @@ class Result implements Promise<QueryResult> {
431438
* @returns {void}
432439
*/
433440
_cancel(): void {
434-
this._streamObserverPromise.then(o => o.cancel())
441+
if (this._summary === null && this._error === null) {
442+
this._streamObserverPromise.then(o => o.cancel())
443+
}
435444
}
436445

437446
/**

packages/core/src/session.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class Session {
6464
private _databaseNameResolved: boolean
6565
private _lowRecordWatermark: number
6666
private _highRecordWatermark: number
67-
67+
private _results: Result[]
6868
/**
6969
* @constructor
7070
* @protected
@@ -128,6 +128,7 @@ class Session {
128128
const calculatedWatermaks = this._calculateWatermaks()
129129
this._lowRecordWatermark = calculatedWatermaks.low
130130
this._highRecordWatermark = calculatedWatermaks.high
131+
this._results = []
131132
}
132133

133134
/**
@@ -154,7 +155,7 @@ class Session {
154155
? new TxConfig(transactionConfig)
155156
: TxConfig.empty()
156157

157-
return this._run(validatedQuery, params, connection => {
158+
const result = this._run(validatedQuery, params, connection => {
158159
this._assertSessionIsOpen()
159160
return (connection as Connection).protocol().run(validatedQuery, params, {
160161
bookmarks: this._lastBookmarks,
@@ -169,6 +170,8 @@ class Session {
169170
highRecordWatermark: this._highRecordWatermark
170171
})
171172
})
173+
this._results.push(result)
174+
return result
172175
}
173176

174177
_run(
@@ -415,6 +418,9 @@ class Session {
415418
async close(): Promise<void> {
416419
if (this._open) {
417420
this._open = false
421+
422+
this._results.forEach(result => result._cancel())
423+
418424
this._transactionExecutor.close()
419425

420426
await this._readConnectionHolder.close()

packages/core/test/result.test.ts

+40
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,26 @@ describe('Result', () => {
5959
expect(keys).toBe(expectedKeys)
6060
})
6161

62+
it('should reject pre-existing errors', () => {
63+
const expectedError = newError('some error')
64+
streamObserverMock.onError(expectedError)
65+
66+
expect(result.keys()).rejects.toBe(expectedError)
67+
})
68+
69+
it('should reject already consumed pre-existing error', async () => {
70+
const expectedError = newError('some error')
71+
streamObserverMock.onError(expectedError)
72+
73+
try {
74+
await result
75+
} catch (e) {
76+
// ignore
77+
}
78+
79+
expect(result.keys()).rejects.toBe(expectedError)
80+
})
81+
6282
it('should resolve key pushed afterwards', done => {
6383
const expectedKeys = ['a', 'c']
6484

@@ -154,6 +174,26 @@ describe('Result', () => {
154174
expect(summary).toEqual(expectedSummary)
155175
})
156176

177+
it('should reject a pre-existing error', () => {
178+
const expectedError = newError('the expected error')
179+
streamObserverMock.onError(expectedError)
180+
181+
expect(result.summary()).rejects.toThrow(expectedError)
182+
})
183+
184+
it('should reject already consumed pre-existing error', async () => {
185+
const expectedError = newError('the expected error')
186+
streamObserverMock.onError(expectedError)
187+
188+
try {
189+
await result
190+
} catch (_) {
191+
// ignore
192+
}
193+
194+
expect(result.summary()).rejects.toThrow(expectedError)
195+
})
196+
157197
it('should resolve summary pushe afterwards', done => {
158198
const metadata = {
159199
resultConsumedAfter: 20,

packages/core/test/session.test.ts

+32
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,38 @@ describe('session', () => {
204204
})
205205
}, 70000)
206206

207+
it('should call cancel current result', done => {
208+
const session = newSessionWithConnection(newFakeConnection())
209+
210+
const result = session.run('RETURN 1')
211+
const spiedCancel = jest.spyOn(result, '_cancel')
212+
213+
session.close()
214+
.finally(() => {
215+
expect(spiedCancel).toHaveBeenCalled()
216+
done()
217+
})
218+
})
219+
220+
it('should call cancel current results', done => {
221+
const session = newSessionWithConnection(newFakeConnection())
222+
223+
const spiedCancels: any[] = []
224+
225+
for (let i = 0; i < 10; i++) {
226+
const result = session.run('RETURN 1')
227+
spiedCancels.push(jest.spyOn(result, '_cancel'))
228+
}
229+
230+
session.close()
231+
.finally(() => {
232+
spiedCancels.forEach(spy => {
233+
expect(spy).toHaveBeenCalled()
234+
})
235+
done()
236+
})
237+
})
238+
207239
describe('.lastBookmark()', () => {
208240
it.each([
209241
[bookmarks.Bookmarks.empty()],

packages/neo4j-driver/test/session.test.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ describe('#integration session', () => {
341341
'Query failed after a long running query was terminated',
342342
error
343343
)
344-
done.fail.bind(done)
344+
done.fail(error)
345345
})
346346
})
347347

0 commit comments

Comments
 (0)