Skip to content

Commit 1788867

Browse files
committed
4.x: Add virtualCreate and virtualTransform to Flowable
1 parent a5d0c55 commit 1788867

18 files changed

+1272
-3
lines changed

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515

1616
import java.util.*;
1717
import java.util.concurrent.*;
18+
import java.util.concurrent.Flow.*;
1819
import java.util.stream.*;
1920

20-
import static java.util.concurrent.Flow.*;
21-
2221
import io.reactivex.rxjava4.annotations.*;
2322
import io.reactivex.rxjava4.disposables.*;
2423
import io.reactivex.rxjava4.exceptions.*;
@@ -34,6 +33,7 @@
3433
import io.reactivex.rxjava4.internal.schedulers.ImmediateThinScheduler;
3534
import io.reactivex.rxjava4.internal.subscribers.*;
3635
import io.reactivex.rxjava4.internal.util.*;
36+
import io.reactivex.rxjava4.internal.virtual.*;
3737
import io.reactivex.rxjava4.operators.ScalarSupplier;
3838
import io.reactivex.rxjava4.parallel.ParallelFlowable;
3939
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
@@ -20896,4 +20896,122 @@ public final Stream<T> blockingStream(int prefetch) {
2089620896
ObjectHelper.verifyPositive(prefetch, "prefetch");
2089720897
return RxJavaPlugins.onAssembly(new FlowableFlatMapStream<>(this, mapper, prefetch));
2089820898
}
20899+
20900+
/**
20901+
* Construct a {@code Flowable} and use the given {@code generator}
20902+
* to generate items on demand while running on the given {@link ExecutorService}.
20903+
* <p>
20904+
* <dl>
20905+
* <dt><b>Backpressure:</b></dt>
20906+
* <dd>This operator honors backpressure from downstream and blocks the emitter if
20907+
* the downstream is not ready.
20908+
* </dd>
20909+
* <dt><b>Scheduler:</b></dt>
20910+
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
20911+
* </dl>
20912+
* <p>
20913+
* Note that backpressure is handled via blocking so it is recommended the provided
20914+
* {@code ExecutorService} uses virtual threads, such as the one returned by
20915+
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
20916+
* <p>
20917+
* Examples:
20918+
* <pre><code>
20919+
* try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
20920+
* Flowable.&lt;Integer&gt;virtualCreate(emitter -> {
20921+
* for (int i = 0; i &lt; 10; i++) {
20922+
* Thread.sleep(1000);
20923+
* emitter.emit(i);
20924+
* }
20925+
* }, executor)
20926+
* .subscribe(
20927+
* System.out::println,
20928+
* Throwable::printStackTrace,
20929+
* () -&gt; System.out.println("Done")
20930+
* );
20931+
* }
20932+
* </code></pre>
20933+
* @param <T> the element type to emit
20934+
* @param generator the callback used to generate items on demand by the downstream
20935+
* @param executor the target {@code ExecutorService} to use for running the callback
20936+
* @return the new {@code Flowable} instance
20937+
* @throws NullPointerException if {@code generator} or {@code executor} is {@code null}
20938+
* @since 4.0.0
20939+
*/
20940+
@CheckReturnValue
20941+
@BackpressureSupport(BackpressureKind.FULL)
20942+
@SchedulerSupport(SchedulerSupport.NONE)
20943+
@NonNull
20944+
public static <@NonNull T> Flowable<T> virtualCreate(@NonNull VirtualGenerator<T> generator, @NonNull ExecutorService executor) {
20945+
Objects.requireNonNull(generator, "generator is null");
20946+
Objects.requireNonNull(executor, "executor is null");
20947+
return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, executor));
20948+
}
20949+
20950+
/**
20951+
* Returns a {@code Flowable} that turns an upstream item an upstream item into
20952+
* zero or more downstream values by running on the given {@link ExecutorService}.
20953+
* <p>
20954+
* <dl>
20955+
* <dt><b>Backpressure:</b></dt>
20956+
* <dd>This operator honors backpressure from downstream and blocks the emitter if
20957+
* the downstream is not ready.
20958+
* </dd>
20959+
* <dt><b>Scheduler:</b></dt>
20960+
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
20961+
* </dl>
20962+
* <p>
20963+
* Note that backpressure is handled via blocking so it is recommended the provided
20964+
* {@code ExecutorService} uses virtual threads, such as the one returned by
20965+
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
20966+
* @param <R> the downstream element type
20967+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} is invoked for each upstream item
20968+
* @param executor the target {@code ExecutorService} to use for running the callback
20969+
* @return the new {@code Flowable} instance
20970+
* @throws NullPointerException if {@code transformer} or {@code executor} is {@code null}
20971+
* @since 4.0.0
20972+
*/
20973+
@CheckReturnValue
20974+
@BackpressureSupport(BackpressureKind.FULL)
20975+
@SchedulerSupport(SchedulerSupport.NONE)
20976+
@NonNull
20977+
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer, @NonNull ExecutorService executor) {
20978+
return virtualTransform(transformer, executor, Flowable.bufferSize());
20979+
}
20980+
20981+
/**
20982+
* Returns a {@code Flowable} that turns an upstream item into zero or more downstream
20983+
* values by running on the given {@link ExecutorService}.
20984+
* <p>
20985+
* <dl>
20986+
* <dt><b>Backpressure:</b></dt>
20987+
* <dd>This operator honors backpressure from downstream and blocks the emitter if
20988+
* the downstream is not ready.
20989+
* </dd>
20990+
* <dt><b>Scheduler:</b></dt>
20991+
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
20992+
* </dl>
20993+
* <p>
20994+
* Note that backpressure is handled via blocking so it is recommended the provided
20995+
* {@code ExecutorService} uses virtual threads, such as the one returned by
20996+
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
20997+
* @param <R> the downstream element type
20998+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} is invoked for each upstream item
20999+
* @param executor the target {@code ExecutorService} to use for running the callback
21000+
* @param prefetch the number of items to fetch from the upstream.
21001+
* @return the new {@code Flowable} instance
21002+
* @throws NullPointerException if {@code transformer} or {@code executor} is {@code null}
21003+
* @throws IllegalArgumentException if {@code prefetch} is non-positive
21004+
* @since 4.0.0
21005+
*/
21006+
@CheckReturnValue
21007+
@BackpressureSupport(BackpressureKind.FULL)
21008+
@SchedulerSupport(SchedulerSupport.NONE)
21009+
@NonNull
21010+
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer, @NonNull ExecutorService executor, int prefetch) {
21011+
Objects.requireNonNull(transformer, "transformer is null");
21012+
Objects.requireNonNull(executor, "executor is null");
21013+
ObjectHelper.verifyPositive(prefetch, "prefetch");
21014+
return new FlowableVirtualTransformExecutor<>(this, transformer, executor, prefetch);
21015+
}
21016+
2089921017
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core;
15+
16+
/**
17+
* Interface handed to user code in {@link Flowable#virtualCreate(VirtualGenerator, java.util.concurrent.ExecutorService)} callback.
18+
* @param <T> the element type to emit
19+
* @since 4.0.0
20+
*/
21+
@FunctionalInterface
22+
public interface VirtualEmitter<T> {
23+
24+
/**
25+
* Signal the next item
26+
* @param item the item to signal
27+
* @throws Throwable an arbitrary exception if the downstream cancelled
28+
*/
29+
void emit(T item) throws Throwable;
30+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core;
15+
16+
/**
17+
* Interface to implement to produce elements when asked by
18+
* {@link Flowable#virtaulCreate(VirtualGenerator, java.util.concurrent.ExecutorService)}.
19+
* <p>
20+
* To signal {@code onComplete}, return normally from {@link #generate(VirtualEmitter)}.
21+
* To signal {@code onError}, throw any exception from {@link #generate(VirtualEmitter)}.
22+
* @param <T> the element type generated
23+
* @since 4.0.0
24+
*/
25+
@FunctionalInterface
26+
public interface VirtualGenerator<T> {
27+
28+
/**
29+
* The method to implement and start emitting items.
30+
* @param emitter use {@link VirtualEmitter#emit(Object)} to generate values
31+
* @throws Throwable if the generator wishes to signal {@code onError}.
32+
*/
33+
void generate(VirtualEmitter<T> emitter) throws Throwable;
34+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core;
15+
/**
16+
* Interface called by the {@link Flowable#virtualTransform(VirtualTransformer, java.util.concurrent.ExecutorService)}
17+
* operator to generate any number of output values based of the current input of the upstream.
18+
*
19+
* @param <T> the source value type
20+
* @param <R> the result value type
21+
* @since 4.0.0
22+
*/
23+
@FunctionalInterface
24+
public interface VirtualTransformer<T, R> {
25+
26+
/**
27+
* Implement this method to generate any number of items via
28+
* {@link VirtualEmitter#emit(Object)}.
29+
*
30+
* @param value the upstream value
31+
* @param emitter the emitter to use to generate result value(s)
32+
* @throws Throwable signaled as {@code onError} for the downstream.
33+
*/
34+
void transform(T value, VirtualEmitter<R> emitter) throws Throwable;
35+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.virtual;
15+
16+
import java.util.Objects;
17+
import java.util.concurrent.*;
18+
import java.util.concurrent.Flow.*;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import io.reactivex.rxjava4.core.*;
22+
import io.reactivex.rxjava4.exceptions.Exceptions;
23+
import io.reactivex.rxjava4.internal.util.BackpressureHelper;
24+
25+
/**
26+
* Runs a generator callback on a virtual thread backed by a Worker of the given scheduler
27+
* and signals events emitted by the generator considering any downstream backpressure.
28+
*
29+
* @param <T> the element type of the flow
30+
* @since 4.0.0
31+
*/
32+
public final class FlowableVirtualCreateExecutor<T> extends Flowable<T> {
33+
34+
final VirtualGenerator<T> generator;
35+
36+
final ExecutorService executor;
37+
38+
public FlowableVirtualCreateExecutor(VirtualGenerator<T> generator, ExecutorService executor) {
39+
this.generator = generator;
40+
this.executor = executor;
41+
}
42+
43+
@Override
44+
protected void subscribeActual(Subscriber<? super T> s) {
45+
var parent = new ExecutorVirtualCreateSubscription<>(s, generator);
46+
s.onSubscribe(parent);
47+
48+
executor.submit(parent);
49+
}
50+
51+
static final class ExecutorVirtualCreateSubscription<T> extends AtomicLong
52+
implements Subscription, Callable<Void>, VirtualEmitter<T> {
53+
54+
private static final long serialVersionUID = -6959205135542203083L;
55+
56+
Subscriber<? super T> downstream;
57+
58+
final VirtualGenerator<T> generator;
59+
60+
final VirtualResumable consumerReady;
61+
62+
volatile boolean cancelled;
63+
64+
static final Throwable STOP = new Throwable("Downstream cancelled");
65+
66+
long produced;
67+
68+
ExecutorVirtualCreateSubscription(Subscriber<? super T> downstream, VirtualGenerator<T> generator) {
69+
this.downstream = downstream;
70+
this.generator = generator;
71+
this.consumerReady = new VirtualResumable();
72+
}
73+
74+
@Override
75+
public Void call() {
76+
try {
77+
try {
78+
generator.generate(this);
79+
} catch (Throwable ex) {
80+
Exceptions.throwIfFatal(ex);
81+
if (ex != STOP && !cancelled) {
82+
downstream.onError(ex);
83+
}
84+
return null;
85+
}
86+
if (!cancelled) {
87+
downstream.onComplete();
88+
}
89+
} finally {
90+
downstream = null;
91+
}
92+
return null;
93+
}
94+
95+
@Override
96+
public void request(long n) {
97+
BackpressureHelper.add(this, n);
98+
consumerReady.resume();
99+
}
100+
101+
@Override
102+
public void cancel() {
103+
cancelled = true;
104+
request(1);
105+
}
106+
107+
@Override
108+
public void emit(T item) throws Throwable {
109+
Objects.requireNonNull(item, "item is null");
110+
var p = produced;
111+
while (get() == p && !cancelled) {
112+
consumerReady.await();
113+
}
114+
115+
if (cancelled) {
116+
throw STOP;
117+
}
118+
119+
downstream.onNext(item);
120+
produced = p + 1;
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)