44
44
* {@code Observable}, and others.
45
45
*
46
46
* <p>By default, depending on classpath availability, adapters are registered
47
- * for Reactor, RxJava 2/3, or RxJava 1 (+ RxJava Reactive Streams bridge),
48
- * {@link CompletableFuture}, Java 9+ {@code Flow.Publisher}, and Kotlin
49
- * Coroutines' {@code Deferred} and {@code Flow}.
47
+ * for Reactor, RxJava 3, {@link CompletableFuture}, {@code Flow.Publisher},
48
+ * and Kotlin Coroutines' {@code Deferred} and {@code Flow}.
50
49
*
51
- * <p><strong>Note:</strong> As of Spring Framework 5.3, support for RxJava 1.x
52
- * is deprecated in favor of RxJava 2 and 3.
50
+ * <p><strong>Note:</strong> As of Spring Framework 5.3.11 , support for
51
+ * RxJava 1.x and 2.x is deprecated in favor of RxJava 3.
53
52
*
54
53
* @author Rossen Stoyanchev
55
54
* @author Sebastien Deleuze
@@ -103,15 +102,13 @@ public ReactiveAdapterRegistry() {
103
102
}
104
103
}
105
104
106
- // RxJava1 (deprecated)
105
+ // RxJava
107
106
if (rxjava1Present ) {
108
107
new RxJava1Registrar ().registerAdapters (this );
109
108
}
110
- // RxJava2
111
109
if (rxjava2Present ) {
112
110
new RxJava2Registrar ().registerAdapters (this );
113
111
}
114
- // RxJava3
115
112
if (rxjava3Present ) {
116
113
new RxJava3Registrar ().registerAdapters (this );
117
114
}
@@ -219,6 +216,29 @@ public static ReactiveAdapterRegistry getSharedInstance() {
219
216
}
220
217
221
218
219
+ /**
220
+ * ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or
221
+ * {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}.
222
+ * This is important in places where only the stream and stream element type
223
+ * information is available like encoders and decoders.
224
+ */
225
+ private static class ReactorAdapter extends ReactiveAdapter {
226
+
227
+ ReactorAdapter (ReactiveTypeDescriptor descriptor ,
228
+ Function <Object , Publisher <?>> toPublisherFunction ,
229
+ Function <Publisher <?>, Object > fromPublisherFunction ) {
230
+
231
+ super (descriptor , toPublisherFunction , fromPublisherFunction );
232
+ }
233
+
234
+ @ Override
235
+ public <T > Publisher <T > toPublisher (@ Nullable Object source ) {
236
+ Publisher <T > publisher = super .toPublisher (source );
237
+ return (isMultiValue () ? Flux .from (publisher ) : Mono .from (publisher ));
238
+ }
239
+ }
240
+
241
+
222
242
private static class ReactorRegistrar {
223
243
224
244
void registerAdapters (ReactiveAdapterRegistry registry ) {
@@ -227,8 +247,7 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
227
247
registry .registerReactiveType (
228
248
ReactiveTypeDescriptor .singleOptionalValue (Mono .class , Mono ::empty ),
229
249
source -> (Mono <?>) source ,
230
- Mono ::from
231
- );
250
+ Mono ::from );
232
251
233
252
registry .registerReactiveType (
234
253
ReactiveTypeDescriptor .multiValue (Flux .class , Flux ::empty ),
@@ -243,8 +262,15 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
243
262
registry .registerReactiveType (
244
263
ReactiveTypeDescriptor .nonDeferredAsyncValue (CompletionStage .class , EmptyCompletableFuture ::new ),
245
264
source -> Mono .fromCompletionStage ((CompletionStage <?>) source ),
246
- source -> Mono .from (source ).toFuture ()
247
- );
265
+ source -> Mono .from (source ).toFuture ());
266
+ }
267
+ }
268
+
269
+
270
+ private static class EmptyCompletableFuture <T > extends CompletableFuture <T > {
271
+
272
+ EmptyCompletableFuture () {
273
+ complete (null );
248
274
}
249
275
}
250
276
@@ -268,8 +294,7 @@ void registerAdapter(ReactiveAdapterRegistry registry) {
268
294
registry .registerReactiveType (
269
295
ReactiveTypeDescriptor .multiValue (publisherClass , () -> emptyFlow ),
270
296
source -> (Publisher <?>) ReflectionUtils .invokeMethod (toFluxMethod , null , source ),
271
- publisher -> ReflectionUtils .invokeMethod (toFlowMethod , null , publisher )
272
- );
297
+ publisher -> ReflectionUtils .invokeMethod (toFlowMethod , null , publisher ));
273
298
}
274
299
catch (Throwable ex ) {
275
300
// Ignore
@@ -284,18 +309,17 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
284
309
registry .registerReactiveType (
285
310
ReactiveTypeDescriptor .multiValue (rx .Observable .class , rx .Observable ::empty ),
286
311
source -> RxReactiveStreams .toPublisher ((rx .Observable <?>) source ),
287
- RxReactiveStreams ::toObservable
288
- );
312
+ RxReactiveStreams ::toObservable );
313
+
289
314
registry .registerReactiveType (
290
315
ReactiveTypeDescriptor .singleRequiredValue (rx .Single .class ),
291
316
source -> RxReactiveStreams .toPublisher ((rx .Single <?>) source ),
292
- RxReactiveStreams ::toSingle
293
- );
317
+ RxReactiveStreams ::toSingle );
318
+
294
319
registry .registerReactiveType (
295
320
ReactiveTypeDescriptor .noValue (rx .Completable .class , rx .Completable ::complete ),
296
321
source -> RxReactiveStreams .toPublisher ((rx .Completable ) source ),
297
- RxReactiveStreams ::toCompletable
298
- );
322
+ RxReactiveStreams ::toCompletable );
299
323
}
300
324
}
301
325
@@ -306,29 +330,28 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
306
330
registry .registerReactiveType (
307
331
ReactiveTypeDescriptor .multiValue (io .reactivex .Flowable .class , io .reactivex .Flowable ::empty ),
308
332
source -> (io .reactivex .Flowable <?>) source ,
309
- io .reactivex .Flowable ::fromPublisher
310
- );
333
+ io .reactivex .Flowable ::fromPublisher );
334
+
311
335
registry .registerReactiveType (
312
336
ReactiveTypeDescriptor .multiValue (io .reactivex .Observable .class , io .reactivex .Observable ::empty ),
313
337
source -> ((io .reactivex .Observable <?>) source ).toFlowable (io .reactivex .BackpressureStrategy .BUFFER ),
314
- io .reactivex .Observable ::fromPublisher
315
- );
338
+ io .reactivex .Observable ::fromPublisher );
339
+
316
340
registry .registerReactiveType (
317
341
ReactiveTypeDescriptor .singleRequiredValue (io .reactivex .Single .class ),
318
342
source -> ((io .reactivex .Single <?>) source ).toFlowable (),
319
- io .reactivex .Single ::fromPublisher
320
- );
343
+ io .reactivex .Single ::fromPublisher );
344
+
321
345
registry .registerReactiveType (
322
346
ReactiveTypeDescriptor .singleOptionalValue (io .reactivex .Maybe .class , io .reactivex .Maybe ::empty ),
323
347
source -> ((io .reactivex .Maybe <?>) source ).toFlowable (),
324
348
source -> io .reactivex .Flowable .fromPublisher (source )
325
- .toObservable ().singleElement ()
326
- );
349
+ .toObservable ().singleElement ());
350
+
327
351
registry .registerReactiveType (
328
352
ReactiveTypeDescriptor .noValue (io .reactivex .Completable .class , io .reactivex .Completable ::complete ),
329
353
source -> ((io .reactivex .Completable ) source ).toFlowable (),
330
- io .reactivex .Completable ::fromPublisher
331
- );
354
+ io .reactivex .Completable ::fromPublisher );
332
355
}
333
356
}
334
357
@@ -341,66 +364,34 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
341
364
io .reactivex .rxjava3 .core .Flowable .class ,
342
365
io .reactivex .rxjava3 .core .Flowable ::empty ),
343
366
source -> (io .reactivex .rxjava3 .core .Flowable <?>) source ,
344
- io .reactivex .rxjava3 .core .Flowable ::fromPublisher
345
- );
367
+ io .reactivex .rxjava3 .core .Flowable ::fromPublisher );
368
+
346
369
registry .registerReactiveType (
347
370
ReactiveTypeDescriptor .multiValue (
348
371
io .reactivex .rxjava3 .core .Observable .class ,
349
372
io .reactivex .rxjava3 .core .Observable ::empty ),
350
373
source -> ((io .reactivex .rxjava3 .core .Observable <?>) source ).toFlowable (
351
374
io .reactivex .rxjava3 .core .BackpressureStrategy .BUFFER ),
352
- io .reactivex .rxjava3 .core .Observable ::fromPublisher
353
- );
375
+ io .reactivex .rxjava3 .core .Observable ::fromPublisher );
376
+
354
377
registry .registerReactiveType (
355
378
ReactiveTypeDescriptor .singleRequiredValue (io .reactivex .rxjava3 .core .Single .class ),
356
379
source -> ((io .reactivex .rxjava3 .core .Single <?>) source ).toFlowable (),
357
- io .reactivex .rxjava3 .core .Single ::fromPublisher
358
- );
380
+ io .reactivex .rxjava3 .core .Single ::fromPublisher );
381
+
359
382
registry .registerReactiveType (
360
383
ReactiveTypeDescriptor .singleOptionalValue (
361
384
io .reactivex .rxjava3 .core .Maybe .class ,
362
385
io .reactivex .rxjava3 .core .Maybe ::empty ),
363
386
source -> ((io .reactivex .rxjava3 .core .Maybe <?>) source ).toFlowable (),
364
- io .reactivex .rxjava3 .core .Maybe ::fromPublisher
365
- );
387
+ io .reactivex .rxjava3 .core .Maybe ::fromPublisher );
388
+
366
389
registry .registerReactiveType (
367
390
ReactiveTypeDescriptor .noValue (
368
391
io .reactivex .rxjava3 .core .Completable .class ,
369
392
io .reactivex .rxjava3 .core .Completable ::complete ),
370
393
source -> ((io .reactivex .rxjava3 .core .Completable ) source ).toFlowable (),
371
- io .reactivex .rxjava3 .core .Completable ::fromPublisher
372
- );
373
- }
374
- }
375
-
376
-
377
- /**
378
- * ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or
379
- * {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}.
380
- * This is important in places where only the stream and stream element type
381
- * information is available like encoders and decoders.
382
- */
383
- private static class ReactorAdapter extends ReactiveAdapter {
384
-
385
- ReactorAdapter (ReactiveTypeDescriptor descriptor ,
386
- Function <Object , Publisher <?>> toPublisherFunction ,
387
- Function <Publisher <?>, Object > fromPublisherFunction ) {
388
-
389
- super (descriptor , toPublisherFunction , fromPublisherFunction );
390
- }
391
-
392
- @ Override
393
- public <T > Publisher <T > toPublisher (@ Nullable Object source ) {
394
- Publisher <T > publisher = super .toPublisher (source );
395
- return (isMultiValue () ? Flux .from (publisher ) : Mono .from (publisher ));
396
- }
397
- }
398
-
399
-
400
- private static class EmptyCompletableFuture <T > extends CompletableFuture <T > {
401
-
402
- EmptyCompletableFuture () {
403
- complete (null );
394
+ io .reactivex .rxjava3 .core .Completable ::fromPublisher );
404
395
}
405
396
}
406
397
@@ -418,8 +409,7 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
418
409
registry .registerReactiveType (
419
410
ReactiveTypeDescriptor .multiValue (kotlinx .coroutines .flow .Flow .class , kotlinx .coroutines .flow .FlowKt ::emptyFlow ),
420
411
source -> kotlinx .coroutines .reactor .ReactorFlowKt .asFlux ((kotlinx .coroutines .flow .Flow <?>) source ),
421
- kotlinx .coroutines .reactive .ReactiveFlowKt ::asFlow
422
- );
412
+ kotlinx .coroutines .reactive .ReactiveFlowKt ::asFlow );
423
413
}
424
414
}
425
415
@@ -432,16 +422,14 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
432
422
io .smallrye .mutiny .Uni .class ,
433
423
() -> io .smallrye .mutiny .Uni .createFrom ().nothing ()),
434
424
uni -> ((io .smallrye .mutiny .Uni <?>) uni ).convert ().toPublisher (),
435
- publisher -> io .smallrye .mutiny .Uni .createFrom ().publisher (publisher )
436
- );
425
+ publisher -> io .smallrye .mutiny .Uni .createFrom ().publisher (publisher ));
437
426
438
427
registry .registerReactiveType (
439
428
ReactiveTypeDescriptor .multiValue (
440
429
io .smallrye .mutiny .Multi .class ,
441
430
() -> io .smallrye .mutiny .Multi .createFrom ().empty ()),
442
431
multi -> (io .smallrye .mutiny .Multi <?>) multi ,
443
- publisher -> io .smallrye .mutiny .Multi .createFrom ().publisher (publisher )
444
- );
432
+ publisher -> io .smallrye .mutiny .Multi .createFrom ().publisher (publisher ));
445
433
}
446
434
}
447
435
@@ -459,7 +447,6 @@ public static class SpringCoreBlockHoundIntegration implements BlockHoundIntegra
459
447
460
448
@ Override
461
449
public void applyTo (BlockHound .Builder builder ) {
462
-
463
450
// Avoid hard references potentially anywhere in spring-core (no need for structural dependency)
464
451
465
452
builder .allowBlockingCallsInside (
0 commit comments