Skip to content

Commit d61a247

Browse files
committed
Fix ReactiveStreamsConsumer for PollableChannel
https://build.spring.io/browse/INT-MASTERSPRING40-985/ * Move `onSubscribe()` callback to the `subscribe()` operator to honor a `request` contract from the target subscriber
1 parent 6d936d9 commit d61a247

File tree

1 file changed

+10
-9
lines changed

1 file changed

+10
-9
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,17 @@ protected void doStart() {
172172
else if (this.subscriber != null) {
173173
this.subscription =
174174
Flux.from(this.publisher)
175-
.doOnComplete(this.subscriber::onComplete)
176-
.doOnSubscribe(this.subscriber::onSubscribe)
177175
.subscribe((data) -> {
178-
try {
179-
this.subscriber.onNext(data);
180-
}
181-
catch (Exception ex) {
182-
this.errorHandler.handleError(ex);
183-
}
184-
});
176+
try {
177+
this.subscriber.onNext(data);
178+
}
179+
catch (Exception ex) {
180+
this.errorHandler.handleError(ex);
181+
}
182+
},
183+
null,
184+
this.subscriber::onComplete,
185+
this.subscriber::onSubscribe);
185186
}
186187
}
187188

0 commit comments

Comments
 (0)