Skip to content

Enable cancelling active listeners when unsubscribing #2078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions docs/api/createListenerMiddleware.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ const store = configureStore({
Adds a new listener entry to the middleware. Typically used to "statically" add new listeners during application setup.

```ts no-transpile
const startListening = (options: AddListenerOptions) => Unsubscribe
const startListening = (options: AddListenerOptions) => UnsubscribeListener

interface AddListenerOptions {
// Four options for deciding when the listener will run:
Expand All @@ -170,6 +170,14 @@ type ListenerPredicate<Action extends AnyAction, State> = (
currentState?: State,
originalState?: State
) => boolean

type UnsubscribeListener = (
unsuscribeOptions?: UnsubscribeListenerOptions
) => void

interface UnsubscribeListenerOptions {
cancelActive?: true
}
```

**You must provide exactly _one_ of the four options for deciding when the listener will run: `type`, `actionCreator`, `matcher`, or `predicate`**. Every time an action is dispatched, each listener will be checked to see if it should run based on the current action vs the comparison option provided.
Expand Down Expand Up @@ -199,36 +207,55 @@ Note that the `predicate` option actually allows matching solely against state-r

The ["matcher" utility functions included in RTK](./matching-utilities.mdx) are acceptable as either the `matcher` or `predicate` option.

The return value is a standard `unsubscribe()` callback that will remove this listener. If you try to add a listener entry but another entry with this exact function reference already exists, no new entry will be added, and the existing `unsubscribe` method will be returned.
The return value is an `unsubscribe()` callback that will remove this listener. By default, unsubscribing will _not_ cancel any active instances of the listener. However, you may also pass in `{cancelActive: true}` to cancel running instances.

If you try to add a listener entry but another entry with this exact function reference already exists, no new entry will be added, and the existing `unsubscribe` method will be returned.

The `effect` callback will receive the current action as its first argument, as well as a "listener API" object similar to the "thunk API" object in `createAsyncThunk`.

All listener predicates and callbacks are checked _after_ the root reducer has already processed the action and updated the state. The `listenerApi.getOriginalState()` method can be used to get the state value that existed before the action that triggered this listener was processed.

### `stopListening`

Removes a given listener. It accepts the same arguments as `startListening()`. It checks for an existing listener entry by comparing the function references of `listener` and the provided `actionCreator/matcher/predicate` function or `type` string.
Removes a given listener entry.

It accepts the same arguments as `startListening()`. It checks for an existing listener entry by comparing the function references of `listener` and the provided `actionCreator/matcher/predicate` function or `type` string.

By default, this does _not_ cancel any active running instances. However, you may also pass in `{cancelActive: true}` to cancel running instances.

```ts no-transpile
const stopListening = (options: AddListenerOptions) => boolean
const stopListening = (
options: AddListenerOptions & UnsubscribeListenerOptions
) => boolean

interface UnsubscribeListenerOptions {
cancelActive?: true
}
```

Returns `true` if the `options.effect` listener has been removed, or `false` if no subscription matching the input provided has been found.

```js
// Examples:
// 1) Action type string
listenerMiddleware.stopListening({ type: 'todos/todoAdded', listener })
listenerMiddleware.stopListening({
type: 'todos/todoAdded',
listener,
cancelActive: true,
})
// 2) RTK action creator
listenerMiddleware.stopListening({ actionCreator: todoAdded, listener })
// 3) RTK matcher function
listenerMiddleware.stopListening({ matcher, listener })
listenerMiddleware.stopListening({ matcher, listener, cancelActive: true })
// 4) Listener predicate
listenerMiddleware.stopListening({ predicate, listener })
```

### `clearListeners`

Removes all current listener entries. This is most likely useful for test scenarios where a single middleware or store instance might be used in multiple tests, as well as some app cleanup situations.
Removes all current listener entries. It also cancels all active running instances of those listeners as well.

This is most likely useful for test scenarios where a single middleware or store instance might be used in multiple tests, as well as some app cleanup situations.

```ts no-transpile
const clearListeners = () => void;
Expand All @@ -253,18 +280,22 @@ const unsubscribe = store.dispatch(addListener({ predicate, listener }))

A standard RTK action creator, imported from the package. Dispatching this action tells the middleware to dynamically remove a listener at runtime. Accepts the same arguments as `stopListening()`.

By default, this does _not_ cancel any active running instances. However, you may also pass in `{cancelActive: true}` to cancel running instances.

Returns `true` if the `options.listener` listener has been removed, `false` if no subscription matching the input provided has been found.

```js
store.dispatch(removeListener({ predicate, listener }))
const wasRemoved = store.dispatch(
removeListener({ predicate, listener, cancelActive: true })
)
```

### `removeAllListeners`
### `clearAllListeners`

A standard RTK action creator, imported from the package. Dispatching this action tells the middleware to dynamically remove all listeners at runtime.
A standard RTK action creator, imported from the package. Dispatching this action tells the middleware to remove all current listener entries. It also cancels all active running instances of those listeners as well.

```js
store.dispatch(removeAllListeners())
store.dispatch(clearAllListeners())
```

## Listener API
Expand All @@ -284,7 +315,7 @@ The `listenerApi` object is the second argument to each listener callback. It co

### Listener Subscription Management

- `unsubscribe: () => void`: removes the listener entry from the middleware, and prevent future instances of the listener from running.
- `unsubscribe: () => void`: removes the listener entry from the middleware, and prevent future instances of the listener from running. (This does _not_ cancel any active instances.)
- `subscribe: () => void`: will re-subscribe the listener entry if it was previously removed, or no-op if currently subscribed
- `cancelActiveListeners: () => void`: cancels all other running instances of this same listener _except_ for the one that made this call. (The cancellation will only have a meaningful effect if the other instances are paused using one of the cancellation-aware APIs like `take/cancel/pause/delay` - see "Cancelation and Task Management" in the "Usage" section for more details)
- `signal: AbortSignal`: An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) whose `aborted` property will be set to `true` if the listener execution is aborted or completed.
Expand Down
5 changes: 3 additions & 2 deletions packages/toolkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ export type {
TypedAddListener,
TypedStopListening,
TypedRemoveListener,
Unsubscribe,
UnsubscribeListener,
UnsubscribeListenerOptions,
ForkedTaskExecutor,
ForkedTask,
ForkedTaskAPI,
Expand All @@ -178,6 +179,6 @@ export {
createListenerMiddleware,
addListener,
removeListener,
removeAllListeners,
clearAllListeners,
TaskAbortError,
} from './listenerMiddleware/index'
58 changes: 40 additions & 18 deletions packages/toolkit/src/listenerMiddleware/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import type {
FallbackAddListenerOptions,
ListenerEntry,
ListenerErrorHandler,
Unsubscribe,
UnsubscribeListener,
TakePattern,
ListenerErrorInfo,
ForkedTaskExecutor,
ForkedTask,
TypedRemoveListener,
TaskResult,
AbortSignalWithReason,
UnsubscribeListenerOptions,
} from './types'
import {
abortControllerWithReason,
Expand Down Expand Up @@ -55,7 +56,8 @@ export type {
TypedAddListener,
TypedStopListening,
TypedRemoveListener,
Unsubscribe,
UnsubscribeListener,
UnsubscribeListenerOptions,
ForkedTaskExecutor,
ForkedTask,
ForkedTaskAPI,
Expand Down Expand Up @@ -113,7 +115,11 @@ const createFork = (parentAbortSignal: AbortSignalWithReason<unknown>) => {
}

const createTakePattern = <S>(
startListening: AddListenerOverloads<Unsubscribe, S, Dispatch<AnyAction>>,
startListening: AddListenerOverloads<
UnsubscribeListener,
S,
Dispatch<AnyAction>
>,
signal: AbortSignal
): TakePattern<S> => {
/**
Expand All @@ -130,7 +136,7 @@ const createTakePattern = <S>(
validateActive(signal)

// Placeholder unsubscribe function until the listener is added
let unsubscribe: Unsubscribe = () => {}
let unsubscribe: UnsubscribeListener = () => {}

const tuplePromise = new Promise<[AnyAction, S, S]>((resolve) => {
// Inside the Promise, we synchronously add the listener.
Expand Down Expand Up @@ -223,11 +229,7 @@ const createClearListenerMiddleware = (
listenerMap: Map<string, ListenerEntry>
) => {
return () => {
listenerMap.forEach((entry) => {
entry.pending.forEach((controller) => {
abortControllerWithReason(controller, listenerCancelled)
})
})
listenerMap.forEach(cancelActiveListeners)

listenerMap.clear()
}
Expand Down Expand Up @@ -257,19 +259,19 @@ const safelyNotifyError = (
}

/**
* @alpha
* @public
*/
export const addListener = createAction(
`${alm}/add`
) as TypedAddListener<unknown>

/**
* @alpha
* @public
*/
export const removeAllListeners = createAction(`${alm}/removeAll`)
export const clearAllListeners = createAction(`${alm}/removeAll`)

/**
* @alpha
* @public
*/
export const removeListener = createAction(
`${alm}/remove`
Expand All @@ -279,8 +281,16 @@ const defaultErrorHandler: ListenerErrorHandler = (...args: unknown[]) => {
console.error(`${alm}/error`, ...args)
}

const cancelActiveListeners = (
entry: ListenerEntry<unknown, Dispatch<AnyAction>>
) => {
entry.pending.forEach((controller) => {
abortControllerWithReason(controller, listenerCancelled)
})
}

/**
* @alpha
* @public
*/
export function createListenerMiddleware<
S = unknown,
Expand All @@ -296,7 +306,12 @@ export function createListenerMiddleware<
entry.unsubscribe = () => listenerMap.delete(entry!.id)

listenerMap.set(entry.id, entry)
return entry.unsubscribe
return (cancelOptions?: UnsubscribeListenerOptions) => {
entry.unsubscribe()
if (cancelOptions?.cancelActive) {
cancelActiveListeners(entry)
}
}
}

const findListenerEntry = (
Expand All @@ -323,7 +338,9 @@ export function createListenerMiddleware<
return insertEntry(entry)
}

const stopListening = (options: FallbackAddListenerOptions): boolean => {
const stopListening = (
options: FallbackAddListenerOptions & UnsubscribeListenerOptions
): boolean => {
const { type, effect, predicate } = getListenerEntryPropsFrom(options)

const entry = findListenerEntry((entry) => {
Expand All @@ -335,7 +352,12 @@ export function createListenerMiddleware<
return matchPredicateOrType && entry.effect === effect
})

entry?.unsubscribe()
if (entry) {
entry.unsubscribe()
if (options.cancelActive) {
cancelActiveListeners(entry)
}
}

return !!entry
}
Expand Down Expand Up @@ -405,7 +427,7 @@ export function createListenerMiddleware<
return startListening(action.payload)
}

if (removeAllListeners.match(action)) {
if (clearAllListeners.match(action)) {
clearListenerMiddleware()
return
}
Expand Down
Loading