-
Notifications
You must be signed in to change notification settings - Fork 110
RxScala <-> RxJava converters #207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
5e868c4
JavaConverters
rvanheest 68359b1
documentation
rvanheest 0b1e181
fix typo
rvanheest 966315d
license
rvanheest 9a406e1
spaces rather than tabs
rvanheest 7b3321f
more spaces
rvanheest 55b816b
demos and bugfixes
rvanheest 2923e83
Unit types in test methods
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| package examples | ||
|
|
||
| import java.util.concurrent.CountDownLatch | ||
|
|
||
| import org.junit.{Ignore, Test} | ||
| import org.scalatest.junit.JUnitSuite | ||
| import rx.functions.Action1 | ||
| import rx.lang.scala.JavaConverters._ | ||
| import rx.lang.scala.{Observable, Observer, Subscriber, Subscription} | ||
|
|
||
| import scala.language.postfixOps | ||
|
|
||
| @Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily | ||
| class JavaConvertersDemo extends JUnitSuite { | ||
|
|
||
| @Test | ||
| def javaObservableToScalaObservableConverter() = { | ||
|
|
||
| // given a RxJava Observable (for example from some third party library)... | ||
| def getJavaObservableFromSomewhere: rx.Observable[Int] = | ||
| rx.Observable.just(1, 2, 3) | ||
|
|
||
| // it is possible to transform this into a RxScala Observable using `asScala` | ||
| // after that you can use all Scala style operators | ||
| getJavaObservableFromSomewhere.asScala | ||
| .map(2 *) | ||
| .filter(_ % 4 == 0) | ||
| .subscribe(println, _.printStackTrace(), () => println("done")) | ||
| } | ||
|
|
||
| @Test | ||
| def scalaObservableToJavaObservableConverter() = { | ||
|
|
||
| // given a function that takes a RxJava Observable as its input and returns a RxJava Subscription ... | ||
| def useJavaObservableSomewhere(obs: rx.Observable[_ <: Int]): rx.Subscription = | ||
| obs.subscribe(new Action1[Int] { override def call(i: Int): Unit = println(i) }) | ||
|
|
||
| // you can give a RxScala Observable as input and call `asJava`on it... | ||
| val javaSubscription: rx.Subscription = useJavaObservableSomewhere(Observable.just(1, 2, 3).asJava) | ||
|
|
||
| // after which you get a RxJava Subscription back, which can be used again in a RxScala setting using `asScalaSubscription`... | ||
| val scalaSubscription: Subscription = javaSubscription.asScalaSubscription | ||
|
|
||
| // and convert it back to a RxJava Subscription using `asJavaSubscription` | ||
| val javaSubscriptionAgain: rx.Subscription = scalaSubscription.asJavaSubscription | ||
| } | ||
|
|
||
| @Test | ||
| def schedulerConverting() = { | ||
| // the next line is not part of the actual example, but is just to prevent early termination due to scheduling | ||
| val latch = new CountDownLatch(1) | ||
|
|
||
|
|
||
| // given a function that return a particular RxJava-specific scheduler (for example the | ||
| // JavaFxScheduler from RxJavaFx (https://github.com/ReactiveX/RxJavaFX))... | ||
| def getJavaSpecificScheduler: rx.Scheduler = | ||
| rx.schedulers.Schedulers.computation() | ||
|
|
||
| Observable.just(1, 2, 3) | ||
| .doOnNext(_ => println(Thread.currentThread().getName)) // should print "main" | ||
| .observeOn(getJavaSpecificScheduler.asScala) | ||
| .doOnNext(_ => println(Thread.currentThread().getName)) // should print the name of the scheduler as its side effect | ||
| .subscribe(println, e => { e.printStackTrace(); latch.countDown() }, () => { println("done"); latch.countDown() }) | ||
|
|
||
|
|
||
| latch.await() | ||
| } | ||
|
|
||
| def observableCreate(): Unit = { | ||
| Observable((subscriber: Subscriber[Int]) => { | ||
| // As a subscriber is both a Subscriber, Observer and Subscription, you need some way to distinguish between them | ||
| // Observer uses the regular `asJava` and `asScala` operators; | ||
| // Subscriber uses `asJavaSubscriber` and `asScalaSubscriber`; | ||
| // Subscription uses the `asJavaSubscription` and `asScalaSubscription` instead. | ||
| val jObserver: rx.Observer[_ >: Int] = subscriber.asJava | ||
| val jSubscriber: rx.Subscriber[_ >: Int] = subscriber.asJavaSubscriber | ||
| val jSubscription: rx.Subscription = subscriber.asJavaSubscription | ||
|
|
||
| val sObserver: Observer[_ >: Int] = jObserver.asScala | ||
| val sSubscriber: Subscriber[_ >: Int] = jSubscriber.asScalaSubscriber | ||
| val sSubscription: Subscription = jSubscription.asScalaSubscription | ||
| }) | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| /** | ||
| * Copyright 2014 Netflix, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package rx.lang.scala | ||
|
|
||
| import scala.language.implicitConversions | ||
| import Decorators._ | ||
|
|
||
| /** | ||
| * Provides conversion functions `asJava` and `asScala` to convert | ||
| * between RxScala types and RxJava types. | ||
| * | ||
| * Example: | ||
| * {{{ | ||
| * import rx.lang.scala.JavaConverters._ | ||
| * val javaObs = Observable.just(1, 2, 3).asJava | ||
| * val scalaObs = javaObs.asScala | ||
| * }}} | ||
| */ | ||
| object JavaConverters extends DecorateAsJava with DecorateAsScala | ||
|
|
||
| private[scala] trait Decorators { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (sorry for nitpicking) Code indentation in this project is 2 spaces, could you please adhere to this? |
||
|
|
||
| class AsJava[A](op: => A) { | ||
| def asJava: A = op | ||
| } | ||
|
|
||
| class AsJavaSubscription(s: Subscription) { | ||
| def asJavaSubscription: rx.Subscription = s.asJavaSubscription | ||
| } | ||
|
|
||
| class AsJavaSubscriber[A](s: Subscriber[A]) { | ||
| def asJavaSubscriber: rx.Subscriber[_ >: A] = s.asJavaSubscriber | ||
| } | ||
|
|
||
| class AsScala[A](op: => A) { | ||
| def asScala: A = op | ||
| } | ||
|
|
||
| class AsScalaSubscription(s: rx.Subscription) { | ||
| def asScalaSubscription: Subscription = Subscription(s) | ||
| } | ||
|
|
||
| class AsScalaSubscriber[A](s: rx.Subscriber[_ >: A]) { | ||
| def asScalaSubscriber: Subscriber[A] = Subscriber(s) | ||
| } | ||
| } | ||
|
|
||
| private[scala] object Decorators extends Decorators | ||
|
|
||
| /** | ||
| * These functions convert RxScala types to RxJava types. | ||
| * Pure Scala projects won't need them, but they will be useful for polyglot projects. | ||
| */ | ||
| trait DecorateAsJava { | ||
|
|
||
| implicit def toJavaNotification[T](s: Notification[T]): AsJava[rx.Notification[_ <: T]] = | ||
| new AsJava(s.asJavaNotification) | ||
|
|
||
| implicit def toJavaSubscription(s: Subscription): AsJavaSubscription = | ||
| new AsJavaSubscription(s) | ||
|
|
||
| implicit def toJavaSubscriber[T](s: Subscriber[T]): AsJavaSubscriber[T] = | ||
| new AsJavaSubscriber(s) | ||
|
|
||
| implicit def toJavaScheduler(s: Scheduler): AsJava[rx.Scheduler] = | ||
| new AsJava(s.asJavaScheduler) | ||
|
|
||
| implicit def toJavaWorker(s: Worker): AsJava[rx.Scheduler.Worker] = | ||
| new AsJava(s.asJavaWorker) | ||
|
|
||
| implicit def toJavaObserver[T](s: Observer[T]): AsJava[rx.Observer[_ >: T]] = | ||
| new AsJava(s.asJavaObserver) | ||
|
|
||
| implicit def toJavaObservable[T](s: Observable[T]): AsJava[rx.Observable[_ <: T]] = | ||
| new AsJava(s.asJavaObservable) | ||
|
|
||
| private type jOperator[R, T] = rx.Observable.Operator[R, T] | ||
|
|
||
| implicit def toJavaOperator[T, R](operator: Subscriber[R] => Subscriber[T]): AsJava[jOperator[R, T]] = { | ||
| val jOp = new jOperator[R, T] { | ||
| override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = { | ||
| import JavaConverters.toScalaSubscriber | ||
| operator(subscriber.asScalaSubscriber).asJavaSubscriber | ||
| } | ||
| } | ||
| new AsJava[jOperator[R, T]](jOp) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * These functions convert RxJava types to RxScala types. | ||
| * Pure Scala projects won't need them, but they will be useful for polyglot projects. | ||
| */ | ||
| trait DecorateAsScala { | ||
|
|
||
| implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): AsScala[Notification[T]] = | ||
| new AsScala(Notification(s)) | ||
|
|
||
| implicit def toScalaSubscription(s: rx.Subscription): AsScalaSubscription = | ||
| new AsScalaSubscription(s) | ||
|
|
||
| implicit def toScalaSubscriber[T](s: rx.Subscriber[_ >: T]): AsScalaSubscriber[T] = | ||
| new AsScalaSubscriber(s) | ||
|
|
||
| implicit def toScalaScheduler(s: rx.Scheduler): AsScala[Scheduler] = | ||
| new AsScala(Scheduler(s)) | ||
|
|
||
| implicit def toScalaWorker(s: rx.Scheduler.Worker): AsScala[Worker] = | ||
| new AsScala(Worker(s)) | ||
|
|
||
| implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): AsScala[Observer[T]] = | ||
| new AsScala(Observer(s)) | ||
|
|
||
| implicit def toScalaObservable[T](s: rx.Observable[_ <: T]): AsScala[Observable[T]] = { | ||
| val obs = new Observable[T] { | ||
| val asJavaObservable: rx.Observable[_ <: T] = s | ||
| } | ||
| new AsScala[Observable[T]](obs) | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent demo 😃