Skip to content

Introduce AbortSignal to Driver.executeQuery #1199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion packages/core/src/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ class QueryConfig<T = EagerResult> {
resultTransformer?: ResultTransformer<T>
transactionConfig?: TransactionConfig
auth?: AuthToken
signal?: AbortSignal

/**
* @constructor
Expand Down Expand Up @@ -429,6 +430,23 @@ class QueryConfig<T = EagerResult> {
* @see {@link driver}
*/
this.auth = undefined

/**
* The {@link AbortSignal} for aborting query execution.
*
* When aborted, the signal triggers the result consumption cancelation and
* transactions are reset. However, due to race conditions,
* there is no guarantee the transaction will be rolled back.
* Equivalent to {@link Session.close}
*
* **Warning**: This option is only available in runtime which supports AbortSignal.addEventListener.
*
* @since 5.22.0
* @type {AbortSignal|undefined}
* @experimental
* @see https://developer.mozilla.org/en-US/docs/Web/API/EventTarget/addEventListener
*/
this.signal = undefined
}
}

Expand Down Expand Up @@ -595,7 +613,8 @@ class Driver {
database: config.database,
impersonatedUser: config.impersonatedUser,
transactionConfig: config.transactionConfig,
auth: config.auth
auth: config.auth,
signal: config.signal
}, query, parameters)
}

Expand Down
29 changes: 29 additions & 0 deletions packages/core/src/internal/query-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ interface ExecutionConfig<T> {
bookmarkManager?: BookmarkManager
transactionConfig?: TransactionConfig
auth?: AuthToken
signal?: AbortSignal
resultTransformer: (result: Result) => Promise<T>
}

Expand All @@ -49,6 +50,12 @@ export default class QueryExecutor {
auth: config.auth
})

const listenerHandle = installEventListenerWhenPossible(
// Solving linter and types definitions issue
config.signal as unknown as EventTarget,
'abort',
async () => await session.close())

// @ts-expect-error The method is private for external users
session._configureTransactionExecutor(true, TELEMETRY_APIS.EXECUTE_QUERY)

Expand All @@ -62,7 +69,29 @@ export default class QueryExecutor {
return await config.resultTransformer(result)
}, config.transactionConfig)
} finally {
listenerHandle.uninstall()
await session.close()
}
}
}

type Listener = (event: unknown) => unknown

interface EventTarget {
addEventListener?: (type: string, listener: Listener) => unknown
removeEventListener?: (type: string, listener: Listener) => unknown
}

function installEventListenerWhenPossible (target: EventTarget | undefined, event: string, listener: () => unknown): { uninstall: () => void } {
if (typeof target?.addEventListener === 'function') {
target.addEventListener(event, listener)
}

return {
uninstall: () => {
if (typeof target?.removeEventListener === 'function') {
target.removeEventListener(event, listener)
}
}
}
}
5 changes: 4 additions & 1 deletion packages/core/test/driver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ describe('Driver', () => {
key: 'value'
}
}
const aAbortController = new AbortController()

async function aTransformer (result: Result): Promise<string> {
const summary = await result.summary()
return summary.database.name ?? 'no-db-set'
Expand All @@ -488,7 +490,8 @@ describe('Driver', () => {
['config.bookmarkManager=null', 'q', {}, { bookmarkManager: null }, extendsDefaultWith({ bookmarkManager: undefined })],
['config.bookmarkManager set to non-null/empty', 'q', {}, { bookmarkManager: theBookmarkManager }, extendsDefaultWith({ bookmarkManager: theBookmarkManager })],
['config.resultTransformer set', 'q', {}, { resultTransformer: aTransformer }, extendsDefaultWith({ resultTransformer: aTransformer })],
['config.transactionConfig set', 'q', {}, { transactionConfig: aTransactionConfig }, extendsDefaultWith({ transactionConfig: aTransactionConfig })]
['config.transactionConfig set', 'q', {}, { transactionConfig: aTransactionConfig }, extendsDefaultWith({ transactionConfig: aTransactionConfig })],
['config.signal set', 'q', {}, { signal: aAbortController.signal }, extendsDefaultWith({ signal: aAbortController.signal })]
])('should handle the params for %s', async (_, query, params, config, buildExpectedConfig) => {
const spiedExecute = jest.spyOn(queryExecutor, 'execute')

Expand Down
114 changes: 113 additions & 1 deletion packages/core/test/internal/query-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ describe('QueryExecutor', () => {
}
}

const aAbortController = new AbortController()

it.each([
['bookmarkManager set', { bookmarkManager: aBookmarkManager }, { bookmarkManager: aBookmarkManager }],
['bookmarkManager undefined', { bookmarkManager: undefined }, { bookmarkManager: undefined }],
Expand All @@ -41,7 +43,10 @@ describe('QueryExecutor', () => {
['impersonatedUser set', { impersonatedUser: 'anUser' }, { impersonatedUser: 'anUser' }],
['impersonatedUser undefined', { impersonatedUser: undefined }, { impersonatedUser: undefined }],
['auth set', { auth: { scheme: 'none', credentials: '' } }, { auth: { scheme: 'none', credentials: '' } }],
['auth undefined', { auth: undefined }, { auth: undefined }]
['auth undefined', { auth: undefined }, { auth: undefined }],
['signal set', { signal: aAbortController.signal }, { }],
['signal set signal', { signal: {} as unknown as AbortSignal }, { }],
['signal undefined', { signal: undefined }, { }]
])('should redirect % to the session creation', async (_, executorConfig, expectConfig) => {
const { queryExecutor, createSession } = createExecutor()

Expand Down Expand Up @@ -208,6 +213,56 @@ describe('QueryExecutor', () => {
expect(errorGot).toBe(closeError)
}
})

whenAbortSignalIsEventTarget(() => {
it('should configure listener and remove at end', async () => {
const { queryExecutor, sessionsCreated } = createExecutor()
const controller = new AbortController()
const signal = controller.signal
// @ts-expect-error
const addListenerSpy = jest.spyOn(signal, 'addEventListener')
// @ts-expect-error
const removerListenerSpy = jest.spyOn(signal, 'removeEventListener')

const promise = queryExecutor.execute({ ...baseConfig, signal }, 'query')

expect(addListenerSpy).toHaveBeenCalled()
expect(removerListenerSpy).not.toHaveBeenCalled()

await promise

expect(removerListenerSpy).toHaveBeenCalled()

// Default expectations
expect(sessionsCreated.length).toBe(1)
const [{ spyOnExecuteRead }] = sessionsCreated
expect(spyOnExecuteRead).toHaveBeenCalled()
})

it('should close session when abort', async () => {
const { queryExecutor, sessionsCreated } = createExecutor()
const controller = new AbortController()
const signal = controller.signal
// @ts-expect-error
const removerListenerSpy = jest.spyOn(signal, 'removeEventListener')

const promise = queryExecutor.execute({ ...baseConfig, signal }, 'query')

controller.abort()

// Expect to close session
expect(sessionsCreated[0].session.close).toHaveBeenCalled()

await promise

expect(removerListenerSpy).toHaveBeenCalled()

// Default expectations
expect(sessionsCreated.length).toBe(1)
const [{ spyOnExecuteRead }] = sessionsCreated
expect(spyOnExecuteRead).toHaveBeenCalled()
})
})
})

describe('when routing="WRITE"', () => {
Expand Down Expand Up @@ -364,6 +419,56 @@ describe('QueryExecutor', () => {
expect(errorGot).toBe(closeError)
}
})

whenAbortSignalIsEventTarget(() => {
it('should configure listener and remove at end', async () => {
const { queryExecutor, sessionsCreated } = createExecutor()
const controller = new AbortController()
const signal = controller.signal
// @ts-expect-error
const addListenerSpy = jest.spyOn(signal, 'addEventListener')
// @ts-expect-error
const removerListenerSpy = jest.spyOn(signal, 'removeEventListener')

const promise = queryExecutor.execute({ ...baseConfig, signal }, 'query')

expect(addListenerSpy).toHaveBeenCalled()
expect(removerListenerSpy).not.toHaveBeenCalled()

await promise

expect(removerListenerSpy).toHaveBeenCalled()

// Default expectations
expect(sessionsCreated.length).toBe(1)
const [{ spyOnExecuteWrite }] = sessionsCreated
expect(spyOnExecuteWrite).toHaveBeenCalled()
})

it('should close session when abort', async () => {
const { queryExecutor, sessionsCreated } = createExecutor()
const controller = new AbortController()
const signal = controller.signal
// @ts-expect-error
const removerListenerSpy = jest.spyOn(signal, 'removeEventListener')

const promise = queryExecutor.execute({ ...baseConfig, signal }, 'query')

controller.abort()

// Expect to close session
expect(sessionsCreated[0].session.close).toHaveBeenCalled()

await promise

expect(removerListenerSpy).toHaveBeenCalled()

// Default expectations
expect(sessionsCreated.length).toBe(1)
const [{ spyOnExecuteWrite }] = sessionsCreated
expect(spyOnExecuteWrite).toHaveBeenCalled()
})
})
})

function createExecutor ({
Expand Down Expand Up @@ -455,3 +560,10 @@ describe('QueryExecutor', () => {
}
}
})

function whenAbortSignalIsEventTarget (fn: () => unknown): void {
// @ts-expect-error AbortSignal doesn't implements EventTarget on this TS Config.
if (typeof AbortSignal.prototype.addEventListener === 'function') {
describe('when abort signal is event target', fn)
}
}
21 changes: 20 additions & 1 deletion packages/neo4j-driver-deno/lib/core/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ class QueryConfig<T = EagerResult> {
resultTransformer?: ResultTransformer<T>
transactionConfig?: TransactionConfig
auth?: AuthToken
signal?: AbortSignal

/**
* @constructor
Expand Down Expand Up @@ -429,6 +430,23 @@ class QueryConfig<T = EagerResult> {
* @see {@link driver}
*/
this.auth = undefined

/**
* The {@link AbortSignal} for aborting query execution.
*
* When aborted, the signal triggers the result consumption cancelation and
* transactions are reset. However, due to race conditions,
* there is no guarantee the transaction will be rolled back.
* Equivalent to {@link Session.close}
*
* **Warning**: This option is only available in runtime which supports AbortSignal.addEventListener.
*
* @since 5.22.0
* @type {AbortSignal|undefined}
* @experimental
* @see https://developer.mozilla.org/en-US/docs/Web/API/EventTarget/addEventListener
*/
this.signal = undefined
}
}

Expand Down Expand Up @@ -595,7 +613,8 @@ class Driver {
database: config.database,
impersonatedUser: config.impersonatedUser,
transactionConfig: config.transactionConfig,
auth: config.auth
auth: config.auth,
signal: config.signal
}, query, parameters)
}

Expand Down
29 changes: 29 additions & 0 deletions packages/neo4j-driver-deno/lib/core/internal/query-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ interface ExecutionConfig<T> {
bookmarkManager?: BookmarkManager
transactionConfig?: TransactionConfig
auth?: AuthToken
signal?: AbortSignal
resultTransformer: (result: Result) => Promise<T>
}

Expand All @@ -49,6 +50,12 @@ export default class QueryExecutor {
auth: config.auth
})

const listenerHandle = installEventListenerWhenPossible(
// Solving linter and types definitions issue
config.signal as unknown as EventTarget,
'abort',
async () => await session.close())

// @ts-expect-error The method is private for external users
session._configureTransactionExecutor(true, TELEMETRY_APIS.EXECUTE_QUERY)

Expand All @@ -62,7 +69,29 @@ export default class QueryExecutor {
return await config.resultTransformer(result)
}, config.transactionConfig)
} finally {
listenerHandle.uninstall()
await session.close()
}
}
}

type Listener = (event: unknown) => unknown

interface EventTarget {
addEventListener?: (type: string, listener: Listener) => unknown
removeEventListener?: (type: string, listener: Listener) => unknown
}

function installEventListenerWhenPossible (target: EventTarget | undefined, event: string, listener: () => unknown): { uninstall: () => void } {
if (typeof target?.addEventListener === 'function') {
target.addEventListener(event, listener)
}

return {
uninstall: () => {
if (typeof target?.removeEventListener === 'function') {
target.removeEventListener(event, listener)
}
}
}
}