|
28 | 28 | import io.reactivex.Flowable;
|
29 | 29 | import kotlinx.coroutines.CompletableDeferredKt;
|
30 | 30 | import kotlinx.coroutines.Deferred;
|
31 |
| -import kotlinx.coroutines.flow.FlowKt; |
32 |
| -import kotlinx.coroutines.reactive.flow.FlowAsPublisherKt; |
33 |
| -import kotlinx.coroutines.reactive.flow.PublisherAsFlowKt; |
34 | 31 | import org.reactivestreams.Publisher;
|
35 | 32 | import reactor.core.publisher.Flux;
|
36 | 33 | import reactor.core.publisher.Mono;
|
@@ -97,7 +94,7 @@ public ReactiveAdapterRegistry() {
|
97 | 94 | // We can fall back on "reactive-streams-flow-bridge" (once released)
|
98 | 95 |
|
99 | 96 | // Coroutines
|
100 |
| - if (this.reactorPresent && ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader) && ClassUtils.isPresent("kotlinx.coroutines.reactive.flow.PublisherAsFlowKt", classLoader)) { |
| 97 | + if (this.reactorPresent && ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader)) { |
101 | 98 | new CoroutinesRegistrar().registerAdapters(this);
|
102 | 99 | }
|
103 | 100 | }
|
@@ -351,9 +348,9 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
|
351 | 348 | source -> CoroutinesUtils.monoToDeferred(Mono.from(source)));
|
352 | 349 |
|
353 | 350 | registry.registerReactiveType(
|
354 |
| - ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, FlowKt::emptyFlow), |
355 |
| - source -> FlowAsPublisherKt.from((kotlinx.coroutines.flow.Flow<?>) source), |
356 |
| - PublisherAsFlowKt::from |
| 351 | + ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow), |
| 352 | + source -> kotlinx.coroutines.reactor.FlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source), |
| 353 | + kotlinx.coroutines.reactive.FlowKt::asFlow |
357 | 354 | );
|
358 | 355 | }
|
359 | 356 | }
|
|
0 commit comments