Skip to content

Verify transaction begin before hand-over it to the tx function work #875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions packages/core/src/internal/transaction-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import { newError } from '../error'
import Transaction from '../transaction'
import TransactionPromise from '../transaction-promise'
import { canRetryOn } from './retry-strategy'

const DEFAULT_MAX_RETRY_TIME_MS = 30 * 1000 // 30 seconds
const DEFAULT_INITIAL_RETRY_DELAY_MS = 1000 // 1 seconds
const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0
const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2

type TransactionCreator = () => Transaction
type TransactionCreator = () => TransactionPromise
type TransactionWork<T> = (tx: Transaction) => T | Promise<T>
type Resolve<T> = (value: T | PromiseLike<T>) => void
type Reject = (value: any) => void
Expand Down Expand Up @@ -138,15 +139,15 @@ export class TransactionExecutor {
})
}

_executeTransactionInsidePromise<T>(
async _executeTransactionInsidePromise<T>(
transactionCreator: TransactionCreator,
transactionWork: TransactionWork<T>,
resolve: Resolve<T>,
reject: Reject
): void {
): Promise<void> {
let tx: Transaction
try {
tx = transactionCreator()
tx = await transactionCreator()
} catch (error) {
// failed to create a transaction
reject(error)
Expand Down
35 changes: 20 additions & 15 deletions packages/core/test/session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ describe('session', () => {
})

it('run should send watermarks to Transaction when fetchsize if defined (writeTransaction)', async () => {
const connection = newFakeConnection()
const connection = mockBeginWithSuccess(newFakeConnection())
const session = newSessionWithConnection(connection, false, 1000)
const status = { functionCalled: false }

Expand All @@ -118,7 +118,7 @@ describe('session', () => {
})

it('run should send watermarks to Transaction when fetchsize is fetch all (writeTransaction)', async () => {
const connection = newFakeConnection()
const connection = mockBeginWithSuccess(newFakeConnection())
const session = newSessionWithConnection(connection, false, FETCH_ALL)
const status = { functionCalled: false }

Expand All @@ -135,7 +135,7 @@ describe('session', () => {
})

it('run should send watermarks to Transaction when fetchsize if defined (readTransaction)', async () => {
const connection = newFakeConnection()
const connection = mockBeginWithSuccess(newFakeConnection())
const session = newSessionWithConnection(connection, false, 1000)
const status = { functionCalled: false }

Expand All @@ -152,7 +152,7 @@ describe('session', () => {
})

it('run should send watermarks to Transaction when fetchsize is fetch all (readTransaction)', async () => {
const connection = newFakeConnection()
const connection = mockBeginWithSuccess(newFakeConnection())
const session = newSessionWithConnection(connection, false, FETCH_ALL)
const status = { functionCalled: false }

Expand Down Expand Up @@ -238,17 +238,8 @@ describe('session', () => {
})

it('should resolves a Transaction', async () => {
const connection = newFakeConnection()
const protocol = connection.protocol()
// @ts-ignore
connection.protocol = () => {
return {
...protocol,
beginTransaction: (params: { afterComplete: () => {} }) => {
params.afterComplete()
}
}
}
const connection = mockBeginWithSuccess(newFakeConnection())

const session = newSessionWithConnection(connection, false, 1000)

const tx: Transaction = await session.beginTransaction()
Expand All @@ -258,6 +249,20 @@ describe('session', () => {
})
})

function mockBeginWithSuccess(connection: FakeConnection) {
const protocol = connection.protocol()
// @ts-ignore
connection.protocol = () => {
return {
...protocol,
beginTransaction: (params: { afterComplete: () => {}} ) => {
params.afterComplete()
}
}
}
return connection
}

function newSessionWithConnection(
connection: Connection,
beginTx: boolean = true,
Expand Down
51 changes: 51 additions & 0 deletions packages/neo4j-driver/test/internal/transaction-executor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ describe('#unit TransactionExecutor', () => {
])
}, 30000)

it('should retry when given transaction creator fail on begin once', async () => {
await testRetryWhenTransactionBeginFails([SERVICE_UNAVAILABLE])
}, 30000)

it('should retry when given transaction creator throws on begin many times', async () => {
await testRetryWhenTransactionBeginFails([
SERVICE_UNAVAILABLE,
SESSION_EXPIRED,
TRANSIENT_ERROR_2,
SESSION_EXPIRED,
SERVICE_UNAVAILABLE,
TRANSIENT_ERROR_1,
'Neo.ClientError.Security.AuthorizationExpired'
])
}, 30000)

it('should retry when given transaction work throws once', async () => {
await testRetryWhenTransactionWorkThrows([SERVICE_UNAVAILABLE])
}, 30000)
Expand Down Expand Up @@ -294,6 +310,30 @@ describe('#unit TransactionExecutor', () => {
}
}

async function testRetryWhenTransactionBeginFails (errorCodes) {
const fakeSetTimeout = setTimeoutMock.install()
try {
const executor = new TransactionExecutor()
const transactionCreator = throwingTransactionCreatorOnBegin(
errorCodes,
new FakeTransaction()
)
const usedTransactions = []

const result = await executor.execute(transactionCreator, tx => {
expect(tx).toBeDefined()
usedTransactions.push(tx)
return Promise.resolve(42)
})

expect(usedTransactions.length).toEqual(1)
expect(result).toEqual(42)
verifyRetryDelays(fakeSetTimeout, errorCodes.length)
} finally {
fakeSetTimeout.uninstall()
}
}

async function testRetryWhenTransactionWorkReturnsRejectedPromise (
errorCodes
) {
Expand Down Expand Up @@ -451,6 +491,17 @@ function throwingTransactionCreator (errorCodes, result) {
}
}

function throwingTransactionCreatorOnBegin (errorCodes, result) {
const remainingErrorCodes = errorCodes.slice().reverse()
return () => {
if (remainingErrorCodes.length === 0) {
return Promise.resolve(result)
}
const errorCode = remainingErrorCodes.pop()
return Promise.reject(error(errorCode))
}
}

function throwingTransactionWork (errorCodes, result) {
const remainingErrorCodes = errorCodes.slice().reverse()
return () => {
Expand Down
5 changes: 0 additions & 5 deletions packages/testkit-backend/src/skipped-tests/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ const skippedTests = [
ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments_multi_query'),
ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments_multi_query_nested')
),
skip(
'Eager verification not implemented for tx functions',
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_eager_begin_on_tx_func_run_with_error_on_begin'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_eager_begin_on_tx_func_run_with_disconnect_on_begin')
),
skip(
'Fail while enable Temporary::ResultKeys',
ifEquals('neo4j.test_bookmarks.TestBookmarks.test_can_pass_bookmark_into_next_session'),
Expand Down