diff --git a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala index 6ea92f42..48d8ef21 100644 --- a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -17,6 +17,7 @@ package rx.lang.scala.examples import java.io.IOException import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import rx.lang.scala.subjects.SerializedSubject @@ -27,6 +28,7 @@ import scala.collection.mutable import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationLong +import scala.concurrent.ExecutionContext import scala.language.postfixOps import scala.language.implicitConversions @@ -1358,6 +1360,25 @@ class RxScalaDemo extends JUnitSuite { subscription.unsubscribe() } + def sayHelloFromExecutionContext(ec: ExecutionContext): Unit = { + val scheduler = ExecutionContextScheduler(ec) + val latch = new CountDownLatch(1) + Observable.just("Hello").subscribeOn(scheduler).subscribe( + v => println(s"$v from ${Thread.currentThread.getName}"), + e => e.printStackTrace(), + () => latch.countDown() + ) + latch.await(5, TimeUnit.SECONDS) + } + + @Test def executionContextSchedulerExample(): Unit = { + sayHelloFromExecutionContext(scala.concurrent.ExecutionContext.Implicits.global) + } + + @Test def executionContextSchedulerExample2(): Unit = { + sayHelloFromExecutionContext(ExecutionContext.fromExecutor(Executors.newCachedThreadPool())) + } + def createAHotObservable: Observable[String] = { var first = true Observable[String] { diff --git a/src/main/scala/rx/lang/scala/schedulers/ExecutionContextScheduler.scala b/src/main/scala/rx/lang/scala/schedulers/ExecutionContextScheduler.scala new file mode 100644 index 00000000..9657c975 --- /dev/null +++ b/src/main/scala/rx/lang/scala/schedulers/ExecutionContextScheduler.scala @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.schedulers + +import java.util.concurrent.Executor +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import rx.lang.scala.Scheduler + +object ExecutionContextScheduler { + + /** + * Returns a [[rx.lang.scala.Scheduler]] that executes work on the specified `ExecutionContext`. + */ + def apply(executor: ExecutionContext): ExecutionContextScheduler = { + val jExecutor = if (executor.isInstanceOf[ExecutionContextExecutor]) { + executor.asInstanceOf[ExecutionContextExecutor] + } else { + new Executor { + override def execute(command: Runnable): Unit = executor.execute(command) + } + } + new ExecutionContextScheduler(rx.schedulers.Schedulers.from(jExecutor)) + } +} + +class ExecutionContextScheduler private[scala](val asJavaScheduler: rx.Scheduler) extends Scheduler diff --git a/src/test/scala/rx/lang/scala/schedulers/ExecutionContextSchedulerTest.scala b/src/test/scala/rx/lang/scala/schedulers/ExecutionContextSchedulerTest.scala new file mode 100644 index 00000000..3e7d7b25 --- /dev/null +++ b/src/test/scala/rx/lang/scala/schedulers/ExecutionContextSchedulerTest.scala @@ -0,0 +1,82 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala.schedulers + +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{CountDownLatch, Executors, ThreadFactory, TimeUnit} + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +class ExecutionContextSchedulerTest extends JUnitSuite { + + private val prefix = "ExecutionContextSchedulerTest" + private val threadFactory = new ThreadFactory { + + private val id = new AtomicLong(0) + + override def newThread(r: Runnable): Thread = { + val t = new Thread(r, prefix + id.incrementAndGet()) + t.setDaemon(true) + t + } + } + private val scheduler = ExecutionContextScheduler(ExecutionContext.fromExecutor(Executors.newCachedThreadPool(threadFactory))) + + @Test + def testSchedule(): Unit = { + val worker = scheduler.createWorker + val latch = new CountDownLatch(1) + @volatile var threadName: String = null + worker.schedule { + threadName = Thread.currentThread.getName + latch.countDown() + } + latch.await(30, TimeUnit.SECONDS) + assert(threadName.startsWith(prefix)) + } + + @Test + def testScheduleWithDelay(): Unit = { + val worker = scheduler.createWorker + val latch = new CountDownLatch(1) + @volatile var threadName: String = null + worker.schedule(1 millisecond) { + threadName = Thread.currentThread.getName + latch.countDown() + } + latch.await(30, TimeUnit.SECONDS) + assert(threadName.startsWith(prefix)) + } + + @Test + def testSchedulePeriodically(): Unit = { + val worker = scheduler.createWorker + val latch = new CountDownLatch(1) + @volatile var threadName: String = null + val subscription = worker.schedulePeriodically(1 millisecond, 1 millisecond) { + threadName = Thread.currentThread.getName + latch.countDown() + } + latch.await(30, TimeUnit.SECONDS) + assert(threadName.startsWith(prefix)) + subscription.unsubscribe() + } +}