16
16
17
17
package rx .operators ;
18
18
19
+ import java .util .Random ;
19
20
import static org .junit .Assert .assertEquals ;
20
21
import static org .junit .Assert .assertTrue ;
21
22
import static org .junit .Assert .fail ;
@@ -239,12 +240,16 @@ public void testConcurrencyAndSerialization() throws InterruptedException {
239
240
@ Override
240
241
public Observable <String > call (final GroupedObservable <Boolean , GroupedObservable <String , Integer >> outerGroup ) {
241
242
return outerGroup .flatMap (new Func1 <GroupedObservable <String , Integer >, Observable <String >>() {
242
-
243
243
@ Override
244
244
public Observable <String > call (final GroupedObservable <String , Integer > innerGroup ) {
245
245
final AtomicInteger threadsPerGroup = new AtomicInteger ();
246
246
return innerGroup .take (100 ).map (new Func1 <Integer , String >() {
247
-
247
+ final ThreadLocal <Random > tlr = new ThreadLocal <Random >() {
248
+ @ Override
249
+ protected Random initialValue () {
250
+ return new Random ();
251
+ }
252
+ };
248
253
@ Override
249
254
public String call (Integer i ) {
250
255
int outerThreadCount = outerThreads .incrementAndGet ();
@@ -256,7 +261,11 @@ public String call(Integer i) {
256
261
throw new RuntimeException ("more than 1 thread for this group [" + innerGroup .getKey () + "]: " + innerThreadCount + " (before)" );
257
262
}
258
263
try {
264
+ // give the other threads a shot.
265
+ Thread .sleep (tlr .get ().nextInt (10 ) + 1 );
259
266
return (outerGroup .getKey () ? "Even" : "Odd " ) + " => from source: " + innerGroup .getKey () + " Value: " + i ;
267
+ } catch (InterruptedException ex ) {
268
+ throw new RuntimeException ("Interrupted [" + innerGroup .getKey () + "]: " + i );
260
269
} finally {
261
270
int outerThreadCountAfter = outerThreads .decrementAndGet ();
262
271
setMaxConcurrency (maxOuterConcurrency , outerThreadCountAfter );
0 commit comments