Skip to content

Commit c4a34fa

Browse files
committed
Improve cancel handling in AbstractListenerReadPublisher
Closes gh-30393
1 parent e3f185a commit c4a34fa

File tree

3 files changed

+22
-4
lines changed

3 files changed

+22
-4
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -123,7 +123,7 @@ public final void onDataAvailable() {
123123
}
124124

125125
/**
126-
* Subclasses can call this method to delegate a contain notification when
126+
* Subclasses can call this method to delegate a container notification when
127127
* all data has been read.
128128
*/
129129
public void onAllDataRead() {
@@ -362,6 +362,12 @@ <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
362362
publisher.errorPending = ex;
363363
publisher.handlePendingCompletionOrError();
364364
}
365+
366+
@Override
367+
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
368+
publisher.completionPending = true;
369+
publisher.handlePendingCompletionOrError();
370+
}
365371
},
366372

367373
NO_DEMAND {
@@ -435,6 +441,13 @@ <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
435441
publisher.errorPending = ex;
436442
publisher.handlePendingCompletionOrError();
437443
}
444+
445+
@Override
446+
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
447+
publisher.discardData();
448+
publisher.completionPending = true;
449+
publisher.handlePendingCompletionOrError();
450+
}
438451
},
439452

440453
COMPLETED {

spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ protected void checkOnDataAvailable() {
104104

105105
@Override
106106
protected DataBuffer read() {
107+
if (this.discardCalls != 0) {
108+
return null;
109+
}
107110
this.readCalls++;
108111
return mock();
109112
}

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -290,8 +290,10 @@ else if (rsReadLogger.isTraceEnabled()) {
290290

291291
@Override
292292
protected void discardData() {
293+
Queue<Object> queue = this.pendingMessages;
294+
this.pendingMessages = Queues.empty().get(); // prevent further reading
293295
while (true) {
294-
WebSocketMessage message = (WebSocketMessage) this.pendingMessages.poll();
296+
WebSocketMessage message = (WebSocketMessage) queue.poll();
295297
if (message == null) {
296298
return;
297299
}

0 commit comments

Comments
 (0)