Skip to content

Discard active auto-commit results on Session.close() #889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions packages/core/src/result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class Result implements Promise<QueryResult> {
private _connectionHolder: connectionHolder.ConnectionHolder
private _keys: string[] | null
private _summary: ResultSummary | null
private _error: Error | null
private _watermarks: { high: number; low: number }

/**
Expand All @@ -141,6 +142,7 @@ class Result implements Promise<QueryResult> {
this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER
this._keys = null
this._summary = null
this._error = null
this._watermarks = watermarks
}

Expand All @@ -154,8 +156,10 @@ class Result implements Promise<QueryResult> {
}
*/
keys(): Promise<string[]> {
if (this._keys != null) {
if (this._keys !== null) {
return Promise.resolve(this._keys)
} else if (this._error !== null) {
return Promise.reject(this._error)
}
return new Promise((resolve, reject) => {
this._streamObserverPromise
Expand All @@ -179,8 +183,10 @@ class Result implements Promise<QueryResult> {
*
*/
summary(): Promise<ResultSummary> {
if (this._summary != null) {
if (this._summary !== null) {
return Promise.resolve(this._summary)
} else if (this._error !== null) {
return Promise.reject(this._error)
}
return new Promise((resolve, reject) => {
this._streamObserverPromise
Expand Down Expand Up @@ -405,6 +411,7 @@ class Result implements Promise<QueryResult> {
// and result can't bee consumed any further; call the original onError callback after that
this._connectionHolder.releaseConnection().then(() => {
replaceStacktrace(error, this._stack)
this._error = error
onErrorOriginal.call(observer, error)
})
}
Expand All @@ -431,7 +438,9 @@ class Result implements Promise<QueryResult> {
* @returns {void}
*/
_cancel(): void {
this._streamObserverPromise.then(o => o.cancel())
if (this._summary === null && this._error === null) {
this._streamObserverPromise.then(o => o.cancel())
}
}

/**
Expand Down
10 changes: 8 additions & 2 deletions packages/core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Session {
private _databaseNameResolved: boolean
private _lowRecordWatermark: number
private _highRecordWatermark: number

private _results: Result[]
/**
* @constructor
* @protected
Expand Down Expand Up @@ -128,6 +128,7 @@ class Session {
const calculatedWatermaks = this._calculateWatermaks()
this._lowRecordWatermark = calculatedWatermaks.low
this._highRecordWatermark = calculatedWatermaks.high
this._results = []
}

/**
Expand All @@ -154,7 +155,7 @@ class Session {
? new TxConfig(transactionConfig)
: TxConfig.empty()

return this._run(validatedQuery, params, connection => {
const result = this._run(validatedQuery, params, connection => {
this._assertSessionIsOpen()
return (connection as Connection).protocol().run(validatedQuery, params, {
bookmarks: this._lastBookmarks,
Expand All @@ -169,6 +170,8 @@ class Session {
highRecordWatermark: this._highRecordWatermark
})
})
this._results.push(result)
return result
}

_run(
Expand Down Expand Up @@ -415,6 +418,9 @@ class Session {
async close(): Promise<void> {
if (this._open) {
this._open = false

this._results.forEach(result => result._cancel())

this._transactionExecutor.close()

await this._readConnectionHolder.close()
Expand Down
40 changes: 40 additions & 0 deletions packages/core/test/result.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ describe('Result', () => {
expect(keys).toBe(expectedKeys)
})

it('should reject pre-existing errors', () => {
const expectedError = newError('some error')
streamObserverMock.onError(expectedError)

expect(result.keys()).rejects.toBe(expectedError)
})

it('should reject already consumed pre-existing error', async () => {
const expectedError = newError('some error')
streamObserverMock.onError(expectedError)

try {
await result
} catch (e) {
// ignore
}

expect(result.keys()).rejects.toBe(expectedError)
})

it('should resolve key pushed afterwards', done => {
const expectedKeys = ['a', 'c']

Expand Down Expand Up @@ -154,6 +174,26 @@ describe('Result', () => {
expect(summary).toEqual(expectedSummary)
})

it('should reject a pre-existing error', () => {
const expectedError = newError('the expected error')
streamObserverMock.onError(expectedError)

expect(result.summary()).rejects.toThrow(expectedError)
})

it('should reject already consumed pre-existing error', async () => {
const expectedError = newError('the expected error')
streamObserverMock.onError(expectedError)

try {
await result
} catch (_) {
// ignore
}

expect(result.summary()).rejects.toThrow(expectedError)
})

it('should resolve summary pushe afterwards', done => {
const metadata = {
resultConsumedAfter: 20,
Expand Down
32 changes: 32 additions & 0 deletions packages/core/test/session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,38 @@ describe('session', () => {
})
}, 70000)

it('should call cancel current result', done => {
const session = newSessionWithConnection(newFakeConnection())

const result = session.run('RETURN 1')
const spiedCancel = jest.spyOn(result, '_cancel')

session.close()
.finally(() => {
expect(spiedCancel).toHaveBeenCalled()
done()
})
})

it('should call cancel current results', done => {
const session = newSessionWithConnection(newFakeConnection())

const spiedCancels: any[] = []

for (let i = 0; i < 10; i++) {
const result = session.run('RETURN 1')
spiedCancels.push(jest.spyOn(result, '_cancel'))
}

session.close()
.finally(() => {
spiedCancels.forEach(spy => {
expect(spy).toHaveBeenCalled()
})
done()
})
})

describe('.lastBookmark()', () => {
it.each([
[bookmarks.Bookmarks.empty()],
Expand Down
2 changes: 1 addition & 1 deletion packages/neo4j-driver/test/session.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ describe('#integration session', () => {
'Query failed after a long running query was terminated',
error
)
done.fail.bind(done)
done.fail(error)
})
})

Expand Down