Skip to content

Commit 115f950

Browse files
committed
Enable cancelling listeners when unsubscribing
1 parent 5d39ecf commit 115f950

File tree

5 files changed

+225
-78
lines changed

5 files changed

+225
-78
lines changed

docs/api/createListenerMiddleware.mdx

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ const store = configureStore({
144144
Adds a new listener entry to the middleware. Typically used to "statically" add new listeners during application setup.
145145

146146
```ts no-transpile
147-
const startListening = (options: AddListenerOptions) => Unsubscribe
147+
const startListening = (options: AddListenerOptions) => UnsubscribeListener
148148

149149
interface AddListenerOptions {
150150
// Four options for deciding when the listener will run:
@@ -170,6 +170,12 @@ type ListenerPredicate<Action extends AnyAction, State> = (
170170
currentState?: State,
171171
originalState?: State
172172
) => boolean
173+
174+
type UnsubscribeListener = (unsuscribeOptions?: UnsubscribeOptions) => void
175+
176+
interface UnsubscribeOptions {
177+
cancelActive?: true
178+
}
173179
```
174180

175181
**You must provide exactly _one_ of the four options for deciding when the listener will run: `type`, `actionCreator`, `matcher`, or `predicate`**. Every time an action is dispatched, each listener will be checked to see if it should run based on the current action vs the comparison option provided.
@@ -199,36 +205,54 @@ Note that the `predicate` option actually allows matching solely against state-r
199205

200206
The ["matcher" utility functions included in RTK](./matching-utilities.mdx) are acceptable as either the `matcher` or `predicate` option.
201207

202-
The return value is a standard `unsubscribe()` callback that will remove this listener. If you try to add a listener entry but another entry with this exact function reference already exists, no new entry will be added, and the existing `unsubscribe` method will be returned.
208+
The return value is an `unsubscribe()` callback that will remove this listener. By default, unsubscribing will _not_ cancel any active instances of the listener. However, you may also pass in `{cancelActive: true}` to cancel running instances.
209+
210+
If you try to add a listener entry but another entry with this exact function reference already exists, no new entry will be added, and the existing `unsubscribe` method will be returned.
203211

204212
The `effect` callback will receive the current action as its first argument, as well as a "listener API" object similar to the "thunk API" object in `createAsyncThunk`.
205213

206214
All listener predicates and callbacks are checked _after_ the root reducer has already processed the action and updated the state. The `listenerApi.getOriginalState()` method can be used to get the state value that existed before the action that triggered this listener was processed.
207215

208216
### `stopListening`
209217

210-
Removes a given listener. It accepts the same arguments as `startListening()`. It checks for an existing listener entry by comparing the function references of `listener` and the provided `actionCreator/matcher/predicate` function or `type` string.
218+
Removes a given listener entry.
219+
220+
It accepts the same arguments as `startListening()`. It checks for an existing listener entry by comparing the function references of `listener` and the provided `actionCreator/matcher/predicate` function or `type` string.
221+
222+
By default, this does _not_ cancel any active running instances. However, you may also pass in `{cancelActive: true}` to cancel running instances.
211223

212224
```ts no-transpile
213-
const stopListening = (options: AddListenerOptions) => boolean
225+
const stopListening = (options: AddListenerOptions & UnsubscribeOptions) =>
226+
boolean
227+
228+
interface UnsubscribeOptions {
229+
cancelActive?: true
230+
}
214231
```
215232

216233
Returns `true` if the `options.effect` listener has been removed, or `false` if no subscription matching the input provided has been found.
217234

218235
```js
236+
// Examples:
219237
// 1) Action type string
220-
listenerMiddleware.stopListening({ type: 'todos/todoAdded', listener })
238+
listenerMiddleware.stopListening({
239+
type: 'todos/todoAdded',
240+
listener,
241+
cancelActive: true,
242+
})
221243
// 2) RTK action creator
222244
listenerMiddleware.stopListening({ actionCreator: todoAdded, listener })
223245
// 3) RTK matcher function
224-
listenerMiddleware.stopListening({ matcher, listener })
246+
listenerMiddleware.stopListening({ matcher, listener, cancelActive: true })
225247
// 4) Listener predicate
226248
listenerMiddleware.stopListening({ predicate, listener })
227249
```
228250

229251
### `clearListeners`
230252

231-
Removes all current listener entries. This is most likely useful for test scenarios where a single middleware or store instance might be used in multiple tests, as well as some app cleanup situations.
253+
Removes all current listener entries. It also cancels all active running instances of those listeners as well.
254+
255+
This is most likely useful for test scenarios where a single middleware or store instance might be used in multiple tests, as well as some app cleanup situations.
232256

233257
```ts no-transpile
234258
const clearListeners = () => void;
@@ -253,15 +277,19 @@ const unsubscribe = store.dispatch(addListener({ predicate, listener }))
253277

254278
A standard RTK action creator, imported from the package. Dispatching this action tells the middleware to dynamically remove a listener at runtime. Accepts the same arguments as `stopListening()`.
255279

280+
By default, this does _not_ cancel any active running instances. However, you may also pass in `{cancelActive: true}` to cancel running instances.
281+
256282
Returns `true` if the `options.listener` listener has been removed, `false` if no subscription matching the input provided has been found.
257283

258284
```js
259-
store.dispatch(removeListener({ predicate, listener }))
285+
const wasRemoved = store.dispatch(
286+
removeListener({ predicate, listener, cancelActive: true })
287+
)
260288
```
261289

262290
### `clearAllListeners`
263291

264-
A standard RTK action creator, imported from the package. Dispatching this action tells the middleware to dynamically remove all listeners at runtime.
292+
A standard RTK action creator, imported from the package. Dispatching this action tells the middleware to remove all current listener entries. It also cancels all active running instances of those listeners as well.
265293

266294
```js
267295
store.dispatch(clearAllListeners())
@@ -284,7 +312,7 @@ The `listenerApi` object is the second argument to each listener callback. It co
284312

285313
### Listener Subscription Management
286314

287-
- `unsubscribe: () => void`: removes the listener entry from the middleware, and prevent future instances of the listener from running.
315+
- `unsubscribe: () => void`: removes the listener entry from the middleware, and prevent future instances of the listener from running. (This does _not_ cancel any active instances.)
288316
- `subscribe: () => void`: will re-subscribe the listener entry if it was previously removed, or no-op if currently subscribed
289317
- `cancelActiveListeners: () => void`: cancels all other running instances of this same listener _except_ for the one that made this call. (The cancellation will only have a meaningful effect if the other instances are paused using one of the cancellation-aware APIs like `take/cancel/pause/delay` - see "Cancelation and Task Management" in the "Usage" section for more details)
290318
- `signal: AbortSignal`: An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) whose `aborted` property will be set to `true` if the listener execution is aborted or completed.

packages/toolkit/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ export type {
162162
TypedAddListener,
163163
TypedStopListening,
164164
TypedRemoveListener,
165-
Unsubscribe,
165+
UnsubscribeListener,
166166
ForkedTaskExecutor,
167167
ForkedTask,
168168
ForkedTaskAPI,

packages/toolkit/src/listenerMiddleware/index.ts

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ import type {
1414
FallbackAddListenerOptions,
1515
ListenerEntry,
1616
ListenerErrorHandler,
17-
Unsubscribe,
17+
UnsubscribeListener,
1818
TakePattern,
1919
ListenerErrorInfo,
2020
ForkedTaskExecutor,
2121
ForkedTask,
2222
TypedRemoveListener,
2323
TaskResult,
2424
AbortSignalWithReason,
25+
UnsubscribeOptions,
2526
} from './types'
2627
import {
2728
abortControllerWithReason,
@@ -55,7 +56,7 @@ export type {
5556
TypedAddListener,
5657
TypedStopListening,
5758
TypedRemoveListener,
58-
Unsubscribe,
59+
UnsubscribeListener,
5960
ForkedTaskExecutor,
6061
ForkedTask,
6162
ForkedTaskAPI,
@@ -113,7 +114,11 @@ const createFork = (parentAbortSignal: AbortSignalWithReason<unknown>) => {
113114
}
114115

115116
const createTakePattern = <S>(
116-
startListening: AddListenerOverloads<Unsubscribe, S, Dispatch<AnyAction>>,
117+
startListening: AddListenerOverloads<
118+
UnsubscribeListener,
119+
S,
120+
Dispatch<AnyAction>
121+
>,
117122
signal: AbortSignal
118123
): TakePattern<S> => {
119124
/**
@@ -130,7 +135,7 @@ const createTakePattern = <S>(
130135
validateActive(signal)
131136

132137
// Placeholder unsubscribe function until the listener is added
133-
let unsubscribe: Unsubscribe = () => {}
138+
let unsubscribe: UnsubscribeListener = () => {}
134139

135140
const tuplePromise = new Promise<[AnyAction, S, S]>((resolve) => {
136141
// Inside the Promise, we synchronously add the listener.
@@ -223,11 +228,7 @@ const createClearListenerMiddleware = (
223228
listenerMap: Map<string, ListenerEntry>
224229
) => {
225230
return () => {
226-
listenerMap.forEach((entry) => {
227-
entry.pending.forEach((controller) => {
228-
abortControllerWithReason(controller, listenerCancelled)
229-
})
230-
})
231+
listenerMap.forEach(cancelActiveListeners)
231232

232233
listenerMap.clear()
233234
}
@@ -257,19 +258,19 @@ const safelyNotifyError = (
257258
}
258259

259260
/**
260-
* @alpha
261+
* @public
261262
*/
262263
export const addListener = createAction(
263264
`${alm}/add`
264265
) as TypedAddListener<unknown>
265266

266267
/**
267-
* @alpha
268+
* @public
268269
*/
269270
export const clearAllListeners = createAction(`${alm}/removeAll`)
270271

271272
/**
272-
* @alpha
273+
* @public
273274
*/
274275
export const removeListener = createAction(
275276
`${alm}/remove`
@@ -279,8 +280,16 @@ const defaultErrorHandler: ListenerErrorHandler = (...args: unknown[]) => {
279280
console.error(`${alm}/error`, ...args)
280281
}
281282

283+
function cancelActiveListeners(
284+
entry: ListenerEntry<unknown, Dispatch<AnyAction>>
285+
) {
286+
entry.pending.forEach((controller) => {
287+
abortControllerWithReason(controller, listenerCancelled)
288+
})
289+
}
290+
282291
/**
283-
* @alpha
292+
* @public
284293
*/
285294
export function createListenerMiddleware<
286295
S = unknown,
@@ -296,7 +305,12 @@ export function createListenerMiddleware<
296305
entry.unsubscribe = () => listenerMap.delete(entry!.id)
297306

298307
listenerMap.set(entry.id, entry)
299-
return entry.unsubscribe
308+
return (cancelOptions?: UnsubscribeOptions) => {
309+
entry.unsubscribe()
310+
if (cancelOptions?.cancelActive) {
311+
cancelActiveListeners(entry)
312+
}
313+
}
300314
}
301315

302316
const findListenerEntry = (
@@ -323,7 +337,9 @@ export function createListenerMiddleware<
323337
return insertEntry(entry)
324338
}
325339

326-
const stopListening = (options: FallbackAddListenerOptions): boolean => {
340+
const stopListening = (
341+
options: FallbackAddListenerOptions & UnsubscribeOptions
342+
): boolean => {
327343
const { type, effect, predicate } = getListenerEntryPropsFrom(options)
328344

329345
const entry = findListenerEntry((entry) => {
@@ -335,7 +351,12 @@ export function createListenerMiddleware<
335351
return matchPredicateOrType && entry.effect === effect
336352
})
337353

338-
entry?.unsubscribe()
354+
if (entry) {
355+
entry.unsubscribe()
356+
if (options.cancelActive) {
357+
cancelActiveListeners(entry)
358+
}
359+
}
339360

340361
return !!entry
341362
}

packages/toolkit/src/listenerMiddleware/tests/listenerMiddleware.test.ts

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import type {
2222
ListenerEffectAPI,
2323
TypedAddListener,
2424
TypedStartListening,
25-
Unsubscribe,
25+
UnsubscribeListener,
2626
ListenerMiddleware,
2727
} from '../index'
2828
import type {
@@ -445,7 +445,7 @@ describe('createListenerMiddleware', () => {
445445
})
446446
)
447447

448-
expectType<Unsubscribe>(unsubscribe)
448+
expectType<UnsubscribeListener>(unsubscribe)
449449

450450
store.dispatch(testAction1('a'))
451451

@@ -478,6 +478,80 @@ describe('createListenerMiddleware', () => {
478478
expect(effect.mock.calls).toEqual([[testAction1('a'), middlewareApi]])
479479
})
480480

481+
test('can cancel an active listener when unsubscribing directly', async () => {
482+
let wasCancelled = false
483+
const unsubscribe = startListening({
484+
actionCreator: testAction1,
485+
effect: async (action, listenerApi) => {
486+
try {
487+
await listenerApi.condition(testAction2.match)
488+
} catch (err) {
489+
if (err instanceof TaskAbortError) {
490+
wasCancelled = true
491+
}
492+
}
493+
},
494+
})
495+
496+
store.dispatch(testAction1('a'))
497+
unsubscribe({ cancelActive: true })
498+
expect(wasCancelled).toBe(false)
499+
await delay(10)
500+
expect(wasCancelled).toBe(true)
501+
})
502+
503+
test('can cancel an active listener when unsubscribing via stopListening', async () => {
504+
let wasCancelled = false
505+
const effect = async (action: any, listenerApi: any) => {
506+
try {
507+
await listenerApi.condition(testAction2.match)
508+
} catch (err) {
509+
if (err instanceof TaskAbortError) {
510+
wasCancelled = true
511+
}
512+
}
513+
}
514+
startListening({
515+
actionCreator: testAction1,
516+
effect,
517+
})
518+
519+
store.dispatch(testAction1('a'))
520+
stopListening({ actionCreator: testAction1, effect, cancelActive: true })
521+
expect(wasCancelled).toBe(false)
522+
await delay(10)
523+
expect(wasCancelled).toBe(true)
524+
})
525+
526+
test('can cancel an active listener when unsubscribing via removeListener', async () => {
527+
let wasCancelled = false
528+
const effect = async (action: any, listenerApi: any) => {
529+
try {
530+
await listenerApi.condition(testAction2.match)
531+
} catch (err) {
532+
if (err instanceof TaskAbortError) {
533+
wasCancelled = true
534+
}
535+
}
536+
}
537+
startListening({
538+
actionCreator: testAction1,
539+
effect,
540+
})
541+
542+
store.dispatch(testAction1('a'))
543+
store.dispatch(
544+
removeListener({
545+
actionCreator: testAction1,
546+
effect,
547+
cancelActive: true,
548+
})
549+
)
550+
expect(wasCancelled).toBe(false)
551+
await delay(10)
552+
expect(wasCancelled).toBe(true)
553+
})
554+
481555
const addListenerOptions: [
482556
string,
483557
Omit<

0 commit comments

Comments
 (0)