Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package rx.lang.scala.examples

import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

import rx.lang.scala.subjects.SerializedSubject
Expand All @@ -27,6 +28,7 @@ import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.DurationLong
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
import scala.language.implicitConversions

Expand Down Expand Up @@ -1358,6 +1360,25 @@ class RxScalaDemo extends JUnitSuite {
subscription.unsubscribe()
}

def sayHelloFromExecutionContext(ec: ExecutionContext): Unit = {
val scheduler = ExecutionContextScheduler(ec)
val latch = new CountDownLatch(1)
Observable.just("Hello").subscribeOn(scheduler).subscribe(
v => println(s"$v from ${Thread.currentThread.getName}"),
e => e.printStackTrace(),
() => latch.countDown()
)
latch.await(5, TimeUnit.SECONDS)
}

@Test def executionContextSchedulerExample(): Unit = {
sayHelloFromExecutionContext(scala.concurrent.ExecutionContext.Implicits.global)
}

@Test def executionContextSchedulerExample2(): Unit = {
sayHelloFromExecutionContext(ExecutionContext.fromExecutor(Executors.newCachedThreadPool()))
}

def createAHotObservable: Observable[String] = {
var first = true
Observable[String] {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.schedulers

import java.util.concurrent.Executor
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import rx.lang.scala.Scheduler

object ExecutionContextScheduler {

/**
* Returns a [[rx.lang.scala.Scheduler]] that executes work on the specified `ExecutionContext`.
*/
def apply(executor: ExecutionContext): ExecutionContextScheduler = {
val jExecutor = if (executor.isInstanceOf[ExecutionContextExecutor]) {
executor.asInstanceOf[ExecutionContextExecutor]
} else {
new Executor {
override def execute(command: Runnable): Unit = executor.execute(command)
}
}
new ExecutionContextScheduler(rx.schedulers.Schedulers.from(jExecutor))
}
}

class ExecutionContextScheduler private[scala](val asJavaScheduler: rx.Scheduler) extends Scheduler
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala.schedulers

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{CountDownLatch, Executors, ThreadFactory, TimeUnit}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.postfixOps

import org.junit.Test
import org.scalatest.junit.JUnitSuite

class ExecutionContextSchedulerTest extends JUnitSuite {

private val prefix = "ExecutionContextSchedulerTest"
private val threadFactory = new ThreadFactory {

private val id = new AtomicLong(0)

override def newThread(r: Runnable): Thread = {
val t = new Thread(r, prefix + id.incrementAndGet())
t.setDaemon(true)
t
}
}
private val scheduler = ExecutionContextScheduler(ExecutionContext.fromExecutor(Executors.newCachedThreadPool(threadFactory)))

@Test
def testSchedule(): Unit = {
val worker = scheduler.createWorker
val latch = new CountDownLatch(1)
@volatile var threadName: String = null
worker.schedule {
threadName = Thread.currentThread.getName
latch.countDown()
}
latch.await(30, TimeUnit.SECONDS)
assert(threadName.startsWith(prefix))
}

@Test
def testScheduleWithDelay(): Unit = {
val worker = scheduler.createWorker
val latch = new CountDownLatch(1)
@volatile var threadName: String = null
worker.schedule(1 millisecond) {
threadName = Thread.currentThread.getName
latch.countDown()
}
latch.await(30, TimeUnit.SECONDS)
assert(threadName.startsWith(prefix))
}

@Test
def testSchedulePeriodically(): Unit = {
val worker = scheduler.createWorker
val latch = new CountDownLatch(1)
@volatile var threadName: String = null
val subscription = worker.schedulePeriodically(1 millisecond, 1 millisecond) {
threadName = Thread.currentThread.getName
latch.countDown()
}
latch.await(30, TimeUnit.SECONDS)
assert(threadName.startsWith(prefix))
subscription.unsubscribe()
}
}