diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index 742cbd52c..fbc7a7a0f 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -59,7 +59,7 @@ class Transaction { private readonly _onError: (error: Error) => Promise private readonly _onComplete: (metadata: any, previousBookmarks?: Bookmarks) => void private readonly _fetchSize: number - private readonly _results: any[] + private readonly _results: Result[] private readonly _impersonatedUser?: string private readonly _lowRecordWatermak: number private readonly _highRecordWatermark: number @@ -291,12 +291,22 @@ class Transaction { } } - _onErrorCallback (): Promise { + _onErrorCallback (error: Error): Promise { // error will be "acknowledged" by sending a RESET message // database will then forget about this transaction and cleanup all corresponding resources // it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it this._state = _states.FAILED this._onClose() + this._results.forEach(result => { + if (result.isOpen()) { + // @ts-expect-error + result._streamObserverPromise + .then(resultStreamObserver => resultStreamObserver.onError(error)) + // Nothing to do since we don't have a observer to notify the error + // the result will be already broke in other ways. + .catch((_: Error) => {}) + } + }) // release connection back to the pool return this._connectionHolder.releaseConnection() diff --git a/packages/core/test/transaction.test.ts b/packages/core/test/transaction.test.ts index 2cbeb0115..5739f1b93 100644 --- a/packages/core/test/transaction.test.ts +++ b/packages/core/test/transaction.test.ts @@ -18,13 +18,14 @@ */ import { ConnectionProvider, newError, NotificationFilter, Transaction, TransactionPromise } from '../src' -import { BeginTransactionConfig } from '../src/connection' +import { BeginTransactionConfig, RunQueryConfig } from '../src/connection' import { Bookmarks } from '../src/internal/bookmarks' import { ConnectionHolder } from '../src/internal/connection-holder' import { Logger } from '../src/internal/logger' import { TxConfig } from '../src/internal/tx-config' import FakeConnection from './utils/connection.fake' import { validNotificationFilters } from './utils/notification-filters.fixtures' +import ResultStreamObserverMock from './utils/result-stream-observer.mock' testTx('Transaction', newRegularTransaction) @@ -392,6 +393,48 @@ function testTx (transactionName: string, newTransaction: }) ) }) + + it('should cascade errors in a result to other open results', async () => { + const connection = newFakeConnection() + const expectedError = newError('Something right is not wrong, wut?') + const tx = newTransaction({ + connection, + fetchSize: 1000, + highRecordWatermark: 700, + lowRecordWatermark: 300 + }) + + const observers: ResultStreamObserverMock[] = [] + + jest.spyOn(connection, 'run') + .mockImplementation((query: string, parameters: any, config: RunQueryConfig) => { + const steamObserver = new ResultStreamObserverMock({ + beforeError: config.beforeError, + afterComplete: config.afterComplete + }) + if (query === 'should error') { + steamObserver.onError(expectedError) + } else if (query === 'finished result') { + steamObserver.onCompleted({}) + } + + observers.push(steamObserver) + + return steamObserver + }) + + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) + + const nonConsumedResult = tx.run('RETURN 1') + await tx.run('finished result') + const brokenResult = tx.run('should error') + + await expect(brokenResult).rejects.toThrowError(expectedError) + await expect(nonConsumedResult).rejects.toThrowError(expectedError) + expect(observers[0].error).toEqual(expectedError) + expect(observers[1].error).not.toBeDefined() + expect(observers[2].error).toEqual(expectedError) + }) }) describe('.close()', () => { diff --git a/packages/core/test/utils/result-stream-observer.mock.ts b/packages/core/test/utils/result-stream-observer.mock.ts index 502e47861..e6d320fc7 100644 --- a/packages/core/test/utils/result-stream-observer.mock.ts +++ b/packages/core/test/utils/result-stream-observer.mock.ts @@ -28,10 +28,18 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb private readonly _observers: ResultObserver[] private _error?: Error private _meta?: any + private readonly _beforeError?: (error: Error) => void + private readonly _afterComplete?: (metadata: any) => void - constructor () { + constructor (observers?: { beforeError?: (error: Error) => void, afterComplete?: (metadata: any) => void }) { this._queuedRecords = [] this._observers = [] + this._beforeError = observers?.beforeError + this._afterComplete = observers?.afterComplete + } + + get error (): Error | undefined { + return this._error } cancel (): void {} @@ -88,6 +96,9 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb onError (error: Error): void { this._error = error + if (this._beforeError != null) { + this._beforeError(error) + } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this._observers.filter(o => o.onError).forEach(o => o.onError!(error)) } @@ -98,6 +109,10 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb .filter(o => o.onCompleted) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion .forEach(o => o.onCompleted!(meta)) + + if (this._afterComplete != null) { + this._afterComplete(meta) + } } pause (): void { diff --git a/packages/neo4j-driver-deno/lib/core/transaction.ts b/packages/neo4j-driver-deno/lib/core/transaction.ts index 577093d9d..17df98d16 100644 --- a/packages/neo4j-driver-deno/lib/core/transaction.ts +++ b/packages/neo4j-driver-deno/lib/core/transaction.ts @@ -59,7 +59,7 @@ class Transaction { private readonly _onError: (error: Error) => Promise private readonly _onComplete: (metadata: any, previousBookmarks?: Bookmarks) => void private readonly _fetchSize: number - private readonly _results: any[] + private readonly _results: Result[] private readonly _impersonatedUser?: string private readonly _lowRecordWatermak: number private readonly _highRecordWatermark: number @@ -291,12 +291,22 @@ class Transaction { } } - _onErrorCallback (): Promise { + _onErrorCallback (error: Error): Promise { // error will be "acknowledged" by sending a RESET message // database will then forget about this transaction and cleanup all corresponding resources // it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it this._state = _states.FAILED this._onClose() + this._results.forEach(result => { + if (result.isOpen()) { + // @ts-expect-error + result._streamObserverPromise + .then(resultStreamObserver => resultStreamObserver.onError(error)) + // Nothing to do since we don't have a observer to notify the error + // the result will be already broke in other ways. + .catch((_: Error) => {}) + } + }) // release connection back to the pool return this._connectionHolder.releaseConnection() diff --git a/packages/testkit-backend/deno/controller.ts b/packages/testkit-backend/deno/controller.ts index 511c75d6c..e56c949e8 100644 --- a/packages/testkit-backend/deno/controller.ts +++ b/packages/testkit-backend/deno/controller.ts @@ -34,6 +34,7 @@ function newWire(context: Context, reply: Reply): Wire { name: "DriverError", data: { id, + errorType: e.name, msg: e.message, // @ts-ignore Code Neo4jError does have code code: e.code, diff --git a/packages/testkit-backend/src/controller/local.js b/packages/testkit-backend/src/controller/local.js index 255a47546..eae5311bd 100644 --- a/packages/testkit-backend/src/controller/local.js +++ b/packages/testkit-backend/src/controller/local.js @@ -69,6 +69,7 @@ export default class LocalController extends Controller { const id = this._contexts.get(contextId).addError(e) this._writeResponse(contextId, newResponse('DriverError', { id, + errorType: e.name, msg: e.message, code: e.code, retryable: e.retriable diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 6f767f0c7..ee7e6e3bd 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -1,17 +1,6 @@ import skip, { ifEquals, ifEndsWith, endsWith, ifStartsWith, startsWith, not, or } from './skip.js' const skippedTests = [ - skip( - "Fixme: transactions don't prevent further actions after failure.", - ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_discard_after_tx_termination_on_run'), - ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_pull'), - ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_run'), - ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_run_after_tx_termination_on_run'), - ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_run_after_tx_termination_on_pull'), - ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_rollback_message_after_tx_termination'), - ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_commit_after_tx_termination'), - ifEquals('stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout') - ), skip( 'Driver does not return offset for old DateTime implementations', ifStartsWith('stub.types.test_temporal_types.TestTemporalTypes') @@ -28,7 +17,7 @@ const skippedTests = [ ifEquals('neo4j.datatypes.test_temporal_types.TestDataTypes.test_duration_components') ), skip( - 'Testkit implemenation is deprecated', + 'Testkit implementation is deprecated', ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_node_only_element_id'), ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_path_element_ids_with_only_string'), ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_rel_only_element_id') @@ -38,7 +27,7 @@ const skippedTests = [ ifEndsWith('neo4j.test_summary.TestSummary.test_protocol_version_information') ), skip( - 'Handle qid omission optmization can cause issues in nested queries', + 'Handle qid omission optimization can cause issues in nested queries', ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments'), ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments_multi_query'), ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments_multi_query_nested') @@ -81,7 +70,7 @@ const skippedTests = [ ifEquals('stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested') ), skip( - 'Nested calls does not garauntee order in the records pulling', + 'Nested calls does not guarantee order in the records pulling', ifEquals('stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested') ), skip(