Skip to content

Avoid Results try to pull more records after transaction fail #1145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions packages/core/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Transaction {
private readonly _onError: (error: Error) => Promise<Connection | null>
private readonly _onComplete: (metadata: any, previousBookmarks?: Bookmarks) => void
private readonly _fetchSize: number
private readonly _results: any[]
private readonly _results: Result[]
private readonly _impersonatedUser?: string
private readonly _lowRecordWatermak: number
private readonly _highRecordWatermark: number
Expand Down Expand Up @@ -291,12 +291,22 @@ class Transaction {
}
}

_onErrorCallback (): Promise<Connection | null> {
_onErrorCallback (error: Error): Promise<Connection | null> {
// error will be "acknowledged" by sending a RESET message
// database will then forget about this transaction and cleanup all corresponding resources
// it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it
this._state = _states.FAILED
this._onClose()
this._results.forEach(result => {
if (result.isOpen()) {
// @ts-expect-error
result._streamObserverPromise
.then(resultStreamObserver => resultStreamObserver.onError(error))
// Nothing to do since we don't have a observer to notify the error
// the result will be already broke in other ways.
.catch((_: Error) => {})
}
})

// release connection back to the pool
return this._connectionHolder.releaseConnection()
Expand Down
45 changes: 44 additions & 1 deletion packages/core/test/transaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*/

import { ConnectionProvider, newError, NotificationFilter, Transaction, TransactionPromise } from '../src'
import { BeginTransactionConfig } from '../src/connection'
import { BeginTransactionConfig, RunQueryConfig } from '../src/connection'
import { Bookmarks } from '../src/internal/bookmarks'
import { ConnectionHolder } from '../src/internal/connection-holder'
import { Logger } from '../src/internal/logger'
import { TxConfig } from '../src/internal/tx-config'
import FakeConnection from './utils/connection.fake'
import { validNotificationFilters } from './utils/notification-filters.fixtures'
import ResultStreamObserverMock from './utils/result-stream-observer.mock'

testTx('Transaction', newRegularTransaction)

Expand Down Expand Up @@ -392,6 +393,48 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
})
)
})

it('should cascade errors in a result to other open results', async () => {
const connection = newFakeConnection()
const expectedError = newError('Something right is not wrong, wut?')
const tx = newTransaction({
connection,
fetchSize: 1000,
highRecordWatermark: 700,
lowRecordWatermark: 300
})

const observers: ResultStreamObserverMock[] = []

jest.spyOn(connection, 'run')
.mockImplementation((query: string, parameters: any, config: RunQueryConfig) => {
const steamObserver = new ResultStreamObserverMock({
beforeError: config.beforeError,
afterComplete: config.afterComplete
})
if (query === 'should error') {
steamObserver.onError(expectedError)
} else if (query === 'finished result') {
steamObserver.onCompleted({})
}

observers.push(steamObserver)

return steamObserver
})

tx._begin(async () => Bookmarks.empty(), TxConfig.empty())

const nonConsumedResult = tx.run('RETURN 1')
await tx.run('finished result')
const brokenResult = tx.run('should error')

await expect(brokenResult).rejects.toThrowError(expectedError)
await expect(nonConsumedResult).rejects.toThrowError(expectedError)
expect(observers[0].error).toEqual(expectedError)
expect(observers[1].error).not.toBeDefined()
expect(observers[2].error).toEqual(expectedError)
})
})

describe('.close()', () => {
Expand Down
17 changes: 16 additions & 1 deletion packages/core/test/utils/result-stream-observer.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,18 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb
private readonly _observers: ResultObserver[]
private _error?: Error
private _meta?: any
private readonly _beforeError?: (error: Error) => void
private readonly _afterComplete?: (metadata: any) => void

constructor () {
constructor (observers?: { beforeError?: (error: Error) => void, afterComplete?: (metadata: any) => void }) {
this._queuedRecords = []
this._observers = []
this._beforeError = observers?.beforeError
this._afterComplete = observers?.afterComplete
}

get error (): Error | undefined {
return this._error
}

cancel (): void {}
Expand Down Expand Up @@ -88,6 +96,9 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb

onError (error: Error): void {
this._error = error
if (this._beforeError != null) {
this._beforeError(error)
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._observers.filter(o => o.onError).forEach(o => o.onError!(error))
}
Expand All @@ -98,6 +109,10 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb
.filter(o => o.onCompleted)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
.forEach(o => o.onCompleted!(meta))

if (this._afterComplete != null) {
this._afterComplete(meta)
}
}

pause (): void {
Expand Down
14 changes: 12 additions & 2 deletions packages/neo4j-driver-deno/lib/core/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Transaction {
private readonly _onError: (error: Error) => Promise<Connection | null>
private readonly _onComplete: (metadata: any, previousBookmarks?: Bookmarks) => void
private readonly _fetchSize: number
private readonly _results: any[]
private readonly _results: Result[]
private readonly _impersonatedUser?: string
private readonly _lowRecordWatermak: number
private readonly _highRecordWatermark: number
Expand Down Expand Up @@ -291,12 +291,22 @@ class Transaction {
}
}

_onErrorCallback (): Promise<Connection | null> {
_onErrorCallback (error: Error): Promise<Connection | null> {
// error will be "acknowledged" by sending a RESET message
// database will then forget about this transaction and cleanup all corresponding resources
// it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it
this._state = _states.FAILED
this._onClose()
this._results.forEach(result => {
if (result.isOpen()) {
// @ts-expect-error
result._streamObserverPromise
.then(resultStreamObserver => resultStreamObserver.onError(error))
// Nothing to do since we don't have a observer to notify the error
// the result will be already broke in other ways.
.catch((_: Error) => {})
}
})

// release connection back to the pool
return this._connectionHolder.releaseConnection()
Expand Down
1 change: 1 addition & 0 deletions packages/testkit-backend/deno/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ function newWire(context: Context, reply: Reply): Wire {
name: "DriverError",
data: {
id,
errorType: e.name,
msg: e.message,
// @ts-ignore Code Neo4jError does have code
code: e.code,
Expand Down
1 change: 1 addition & 0 deletions packages/testkit-backend/src/controller/local.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export default class LocalController extends Controller {
const id = this._contexts.get(contextId).addError(e)
this._writeResponse(contextId, newResponse('DriverError', {
id,
errorType: e.name,
msg: e.message,
code: e.code,
retryable: e.retriable
Expand Down
17 changes: 3 additions & 14 deletions packages/testkit-backend/src/skipped-tests/common.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
import skip, { ifEquals, ifEndsWith, endsWith, ifStartsWith, startsWith, not, or } from './skip.js'

const skippedTests = [
skip(
"Fixme: transactions don't prevent further actions after failure.",
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_discard_after_tx_termination_on_run'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_pull'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_run'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_run_after_tx_termination_on_run'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_run_after_tx_termination_on_pull'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_rollback_message_after_tx_termination'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_commit_after_tx_termination'),
ifEquals('stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout')
),
skip(
'Driver does not return offset for old DateTime implementations',
ifStartsWith('stub.types.test_temporal_types.TestTemporalTypes')
Expand All @@ -28,7 +17,7 @@ const skippedTests = [
ifEquals('neo4j.datatypes.test_temporal_types.TestDataTypes.test_duration_components')
),
skip(
'Testkit implemenation is deprecated',
'Testkit implementation is deprecated',
ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_node_only_element_id'),
ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_path_element_ids_with_only_string'),
ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_rel_only_element_id')
Expand All @@ -38,7 +27,7 @@ const skippedTests = [
ifEndsWith('neo4j.test_summary.TestSummary.test_protocol_version_information')
),
skip(
'Handle qid omission optmization can cause issues in nested queries',
'Handle qid omission optimization can cause issues in nested queries',
ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments'),
ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments_multi_query'),
ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments_multi_query_nested')
Expand Down Expand Up @@ -81,7 +70,7 @@ const skippedTests = [
ifEquals('stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested')
),
skip(
'Nested calls does not garauntee order in the records pulling',
'Nested calls does not guarantee order in the records pulling',
ifEquals('stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested')
),
skip(
Expand Down