Skip to content

fix(alm): prevent zombie listeners caused by forked tasks #2070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"scripts": {
"build": "yarn build:packages",
"test": "yarn test:packages",
"build:examples": "yarn workspaces foreach --include '@reduxjs/*' --include '@examples-query-react/*' -vtp run build",
"build:examples": "yarn workspaces foreach --include '@reduxjs/*' --include '@examples-query-react/*' --include '@examples-action-listener/*' -vtp run build",
"build:docs": "yarn workspace website run build",
"build:packages": "yarn workspaces foreach --include '@reduxjs/*' --include '@rtk-query/*' --include '@rtk-incubator/*' --topological-dev run build",
"test:packages": "yarn workspaces foreach --include '@reduxjs/*' --include '@rtk-query/*' --include '@rtk-incubator/*' run test",
Expand Down
2 changes: 1 addition & 1 deletion packages/action-listener-middleware/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ Both these methods are cancelation-aware, and will throw a `TaskAbortError` if t

- `fork: (executor: (forkApi: ForkApi) => T | Promise<T>) => ForkedTask<T>`: Launches a "child task" that may be used to accomplish additional work. Accepts any sync or async function as its argument, and returns a `{result, cancel}` object that can be used to check the final status and return value of the child task, or cancel it while in-progress.

Child tasks can be launched, and waited on to collect their return values. The provided `executor` function will be called with a `forkApi` object containing `{pause, delay, signal}`, allowing it to pause or check cancelation status. It can also make use of the `listenerApi` from the listener's scope.
Child tasks can be launched, and waited on to collect their return values. The provided `executor` function will be called asynchronously with a `forkApi` object containing `{pause, delay, signal}`, allowing it to pause or check cancelation status. It can also make use of the `listenerApi` from the listener's scope.

An example of this might be a listener that forks a child task containing an infinite loop that listens for events from a server. The parent then uses `listenerApi.condition()` to wait for a "stop" action, and cancels the child task.

Expand Down
11 changes: 5 additions & 6 deletions packages/action-listener-middleware/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import type {
ForkedTask,
TypedRemoveListener,
TypedStopListening,
TaskResult,
} from './types'
import {
abortControllerWithReason,
Expand Down Expand Up @@ -85,9 +86,6 @@ const createFork = (parentAbortSignal: AbortSignal) => {
return <T>(taskExecutor: ForkedTaskExecutor<T>): ForkedTask<T> => {
assertFunction(taskExecutor, 'taskExecutor')
const childAbortController = new AbortController()
const cancel = () => {
abortControllerWithReason(childAbortController, taskCancelled)
}

const result = runTask<T>(
async (): Promise<T> => {
Expand All @@ -98,16 +96,17 @@ const createFork = (parentAbortSignal: AbortSignal) => {
delay: createDelay(childAbortController.signal),
signal: childAbortController.signal,
})) as T
validateActive(parentAbortSignal)
validateActive(childAbortController.signal)
return result
},
() => abortControllerWithReason(childAbortController, taskCompleted)
)

return {
result,
cancel,
result: createPause<TaskResult<T>>(parentAbortSignal)(result),
cancel() {
abortControllerWithReason(childAbortController, taskCancelled)
},
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion packages/action-listener-middleware/src/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ export const promisifyAbortSignal = (

/**
* Runs a task and returns promise that resolves to {@link TaskResult}.
*
* Second argument is an optional `cleanUp` function that always runs after task.
*
* **Note:** `runTask` runs the executor in the next microtask.
* @returns
*/
export const runTask = async <T>(
Expand Down
54 changes: 26 additions & 28 deletions packages/action-listener-middleware/src/tests/fork.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { EnhancedStore } from '@reduxjs/toolkit'
import { configureStore, createSlice } from '@reduxjs/toolkit'
import { configureStore, createSlice, createAction } from '@reduxjs/toolkit'

import type { PayloadAction } from '@reduxjs/toolkit'
import type { ForkedTaskExecutor, TaskResult } from '../types'
Expand Down Expand Up @@ -99,24 +99,23 @@ describe('fork', () => {
expect(hasRunAsyncExecutor).toBe(true)
})

it('runs forked tasks that are cancelled if parent listener is cancelled', async () => {
test('forkedTask.result rejects TaskAbortError if listener is cancelled', async () => {
const deferredForkedTaskError = deferred()

startListening({
actionCreator: increment,
effect: async (_, listenerApi) => {
async effect(_, listenerApi) {
listenerApi.cancelActiveListeners()
const result = await listenerApi.fork(async () => {
await delay(20)

throw new Error('unreachable code')
}).result
listenerApi
.fork(async () => {
await delay(10)

if (result.status !== 'ok') {
deferredForkedTaskError.resolve(result.error)
} else {
deferredForkedTaskError.reject(new Error('unreachable code'))
}
throw new Error('unreachable code')
})
.result.then(
deferredForkedTaskError.resolve,
deferredForkedTaskError.resolve
)
},
})

Expand Down Expand Up @@ -386,26 +385,25 @@ describe('fork', () => {
})

test('forkApi.pause rejects if listener is cancelled', async () => {
let deferredResult = deferred()
const incrementByInListener = createAction<number>('incrementByInListener')

startListening({
actionCreator: increment,
effect: async (_, listenerApi) => {
actionCreator: incrementByInListener,
async effect({ payload: amountToIncrement }, listenerApi) {
listenerApi.cancelActiveListeners()
const forkedTask = listenerApi.fork(async (forkApi) => {
await forkApi.pause(delay(30))

return 4
})
deferredResult.resolve(await forkedTask.result)
await listenerApi.fork(async (forkApi) => {
await forkApi.pause(delay(10))
listenerApi.dispatch(incrementByAmount(amountToIncrement))
}).result
listenerApi.dispatch(incrementByAmount(2 * amountToIncrement))
},
})

store.dispatch(increment())
store.dispatch(increment())
store.dispatch(incrementByInListener(10))
store.dispatch(incrementByInListener(100))

expect(await deferredResult).toEqual({
status: 'cancelled',
error: new TaskAbortError(listenerCancelled),
})
await delay(50)

expect(store.getState().value).toEqual(300)
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -709,24 +709,27 @@ describe('createListenerMiddleware', () => {
})

test('clear() cancels all running forked tasks', async () => {
const fork1Test = deferred()
const store = configureStore({
reducer: counterSlice.reducer,
middleware: (gDM) => gDM().prepend(middleware),
})

startListening({
actionCreator: testAction1,
async effect(_, { fork }) {
const taskResult = await fork(() => {
return 3
}).result
fork1Test.resolve(taskResult)
async effect(_, { fork, dispatch }) {
await fork(() => dispatch(incrementByAmount(3))).result
dispatch(incrementByAmount(4))
},
})

expect(store.getState().value).toBe(0)
store.dispatch(testAction1('a'))

clearListeners()
store.dispatch(testAction1('b'))

expect(await fork1Test).toHaveProperty('status', 'cancelled')
await Promise.resolve() // Forked tasks run on the next microtask.

expect(store.getState().value).toBe(0)
})
})

Expand Down
13 changes: 13 additions & 0 deletions packages/action-listener-middleware/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ export type TaskResult<Value> =
| TaskCancelled

export interface ForkedTask<T> {
/**
* A promise that resolves when the task is either completed or cancelled or rejects
* if parent listener execution is cancelled or completed.
*
* ### Example
* ```ts
* const result = await fork(async (forkApi) => Promise.resolve(4)).result
*
* if(result.status === 'ok') {
* console.log(result.value) // logs 4
* }}
* ```
*/
result: Promise<TaskResult<T>>
/**
* Cancel task if it is in progress or not yet started,
Expand Down