From c1180b2f0cb79bef3ba71c20fbdc061359bf5f9d Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Fri, 18 Mar 2016 17:15:34 -0700 Subject: [PATCH 1/2] Rework of the design decisions for the 2.x branch --- DESIGN.md | 326 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 279 insertions(+), 47 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index 72f49408c6..aad6e50e33 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -1,9 +1,21 @@ ## RxJava v2 Design -Terminology, principles, contracts, and other aspects of the design of RxJava v2. +This document explains the terminology, principles, contracts, and other aspects of the design of RxJava v2. +Its intended audience is the implementers of the library. ### Terminology & Definitions +##### Interactive + +Producer obeys consumer-driven flow control. +Consumer manages capacity by requesting data. + + +##### Reactive + +Producer is in charge. Consumer has to do whatever it needs to keep up. + + ##### Hot When used to refer to a data source (such as an `Observable`), it means it does not have side-effects when subscribed to. @@ -12,41 +24,94 @@ For example, an `Observable` of mouse events. Subscribing to that `Observable` d (Note: Yes, there are *some* side-effects of adding a listener, but they are inconsequential as far as the 'hot' usage is concerned). + ##### Cold When used to refer to a data source (such as an `Observable`), it means it has side-effects when subscribed to. For example, an `Observable` of data from a remote API (such as an RPC call). Each time that `Observable` is subscribed to causes a new network call to occur. -##### Reactive + +##### Reactive/Push Producer is in charge. Consumer has to do whatever it needs to keep up. -##### Interactive +Examples: + +- `Observable` (RxJS, Rx.Net, RxJava v1.x without backpressure, RxJava v2) +- Callbacks (the producer calls the function at its convenience) +- IRQ, mouse events, IO interrupts +- 2.x `Flowable` (with `request(n)` credit always granted faster or in larger quantity than producer) +- Reactive Streams `Publisher` (with `request(n)` credit always granted faster or in larger quantity than producer) +- Java 9 `Flow.Publisher` (with `request(n)` credit always granted faster than or in larger quantity producer) + + +##### Synchronous Interactive/Pull Consumer is in charge. Producer has to do whatever it needs to keep up. -##### Push +Examples: + +- `Iterable` +- 2.x/1.x `Observable` (without concurrency, producer and consumer on the same thread) +- 2.x `Flowable` (without concurrency, producer and consumer on the same thread) +- Reactive Streams `Publisher` (without concurrency, producer and consumer on the same thread) +- Java 9 `Flow.Publisher` (without concurrency, producer and consumer on the same thread) + + +##### Async Pull (Async Interactive) + +Consumer requests data when it wishes, and the data is then pushed when the producer wishes to. + +Examples: + +- `Future` & `Promise` +- `Single` (lazy `Future`) +- 2.x `Flowable` +- Reactive Streams `Publisher` +- Java 9 `Flow.Publisher` +- 1.x `Observable` (with backpressure) +- `AsyncEnumerable`/`AsyncIterable` + +There is an overhead (performance and mental) for achieving this, which is why we also have the 2.x `Observable` without backpressure. + + +##### Flow Control + +Flow control is any mitigation strategies that a consumer applies to reduce the flow of data. -Producer emits when it wishes to. Related to "reactive". Callbacks are an instance of push. +Examples: -##### Pull +- Controlling the production of data, such as with `Iterator.next` or `Subscription.request(n)` +- Preventing the delivery of data, such as buffer, drop, sample/throttle, and debounce. -Consumer requests data when it wishes to. Related to "interactive". An `Iterable` is an instance of pull. -##### Async Pull +##### Eager + +Containing object immediately start work when it is created. + +Examples: + +- A `Future` once created has work being performed and represents the eventual value of that work. It can not be deferred once created. + + +##### Lazy + +Containing object does nothing until it is subscribed to or otherwise started. + +Examples: + +- `Observable.create` does not start any work until `Observable.subscribe` starts the work. -Consumer requests data when it wishes, and the data is then pushed when the producer wishes to. The Reactive Streams `Publisher` is an instance of "async pull", as is the 'AsyncEnumerable' in .Net. ### RxJava & Related Types ##### Observable -Stream that supports async and synchronous push. It does not support interactive flow control (`request(n)`). +Stream that supports async and synchronous push. It does *not* support interactive flow control (`request(n)`). Usable for: -- hot and cold sources - sync or async - push - 0, 1, many or infinite items @@ -56,16 +121,35 @@ Flow control support: - buffering, sampling, throttling, windowing, dropping, etc - temporal and count-based strategies +*Type Signature* + +```java +class Observable { + void subscribe(Observer observer); + + interface Observer { + void onNext(T t); + void onError(Throwable t); + void onComplete(); + void onSubscribe(Disposable d); + } +} +``` + +The rule for using this type signature is: + +> onSubscribe onNext* (onError | onComplete)? + + ##### Flowable Stream that supports async and synchronous push and pull. It supports interactive flow control (`request(n)`). Usable for: -- hot and cold sources +- pull sources +- push Observables with backpressure strategy (ie. `Observable.toFlowable(onBackpressureStrategy)`) - sync or async -- push -- pull - 0, 1, many or infinite items Flow control support: @@ -73,77 +157,225 @@ Flow control support: - buffering, sampling, throttling, windowing, dropping, etc - temporal and count-based strategies - `request(n)` consumer demand signal - - for pull-based sources, this allows batched "async pull" - - for push-based sources, this allows backpressure signals to conditionally apply strategies (i.e. drop, buffer, sample, fail, etc) + - for pull-based sources, this allows batched "async pull" + - for push-based sources, this allows backpressure signals to conditionally apply strategies (i.e. drop, first, buffer, sample, fail, etc) + +You get a flowable from: + +- Converting a Observable with a backpressure strategy +- Create from sync/async OnSubscribe API (which participate in backpressure semantics) + +*Type Signature* + +```java +class Flowable implements Flow.Publisher, io.reactivestreams.Publisher { + void subscribe(Subscriber subscriber); + + interface Subscriber implements Flow.Subscriber, io.reactivestreams.Subscriber { + void onNext(T t); + void onError(Throwable t); + void onComplete(); + void onSubscribe(Flowable.Subscription d); + } + + interface Subscription implements Flow.Subscription, io.reactivestreams.Subscription { + void cancel(); + void request(long n); + } +} +``` + +*NOTE: To support `Flow.Publisher` in Java 9+ without breaking Java 7+ compatibility, we want to use the [multi-release jar file support](http://openjdk.java.net/jeps/238).* + +The rule for using this type signature is: + +> onSubscribe onNext* (onError | onComplete)? + + +##### Single + +Lazy representation of a single response (lazy equivalent of `Future`/`Promise`). + +Usable for: + +- pull sources +- push sources being windowed or flow controlled (such as `window(1)` or `take(1)`) +- sync or async +- 1 item + +Flow control: + +- Not applicable (don't subscribe if the single response is not wanted) + +*Type Signature* + +```java +class Single { + void subscribe(Single.Subscriber subscriber); + + interface Subscriber { + void onSuccess(T t); + void onError(Throwable t); + void onSubscribe(Disposable d); + } +} +``` + +> onSubscribe (onError | onSuccess)? + + +##### Completable + +Lazy representation of a unit of work that can complete or fail + +- Semantic equivalent of `Observable.empty().doOnSubscribe()`. +- Alternative for scenarios often represented with types such as `Single` or `Observable`. + +Usable for: + +- sync or async +- 0 items + +*Type Signature* + +```java +class Completable { + void subscribe(Completable.Subscriber subscriber); + + interface Subscriber { + void onComplete(); + void onError(Throwable t); + void onSubscribe(Disposable d); + } +} +``` + +> onSubscribe (onError | onComplete)? ##### Observer -Consumer of events without flow control. +Reactive consumer of events (without consumer-driven flow control). Involved in subscription lifecycle to allow unsubscription. ##### Publisher -[Reactive Streams producer](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#1-publisher-code) of data +Interactive producer of events (with flow control). Implemented by `Flowable`. + +[Reactive Streams producer](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#1-publisher-code) of data. ##### Subscriber +Interactive consumer of events (with consumer-driven flow control). Involved in subscription lifecycle to allow unsubscription. + [Reactive Streams consumer](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#2-subscriber-code) of data. ##### Subscription [Reactive Streams state](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#3-subscription-code) of subscription supporting flow control and cancellation. +`Disposable` is a similar type used for lifecycle management on the `Observable` type without interactive flow control. + ##### Processor [Reactive Streams operator](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#4processor-code) for defining behavior between `Publisher` and `Subscriber`. It must obey the contracts of `Publisher` and `Subscriber`, meaning it is sequential, serialized, and must obey `request(n)` flow control. ##### Subject -A "hot", push-based data source that allows a producer to emit events to it and consumers to subscribe to events in a multicast manner. It is "hot" because consumers subscribing to it does not cause side-effects, or affect the data flow in any way. It is push-based and reactive because the producer is fully in charge. +A "hot", push-based data source that allows a producer to emit events to it and consumers to subscribe to events in a multicast manner. It is "hot" because consumers subscribing to it does not cause side-effects, or affect the data flow in any way. It is push-based and reactive because the producer is fully in charge. -Relation to Reactive Streams +A `Subject` is used to decouple unsubscription. Termination is fully in the control of the producer. `onError` and `onComplete` are still terminal events. +`Subject`s are stateful and retain their terminal state (for replaying to all/future subscribers). -- It can not implement Reactive Streams `Publisher` unless it is created with a default flow control strategy. -- It can not implement `Processor` since a `Processor` must compose `request(n)` which can not be done with multicasting or pure push. +Relation to Reactive Streams: -Flow control support: +- It can not implement Reactive Streams `Publisher` unless it is created with a default consumer-driven flow control strategy. +- It can not implement `Processor` since a `Processor` must compose `request(n)` which can not be done with multicasting or push. + +Here is an approach to converting from a `Subject` to Reactive Streams types by adding a default flow control strategy: + +```java +Subject s = PublishSubject.create(); +// convert to Publisher with backpressure strategy +Publisher p = s.toPublisher(onBackpressureStrategy); + +// now the request(n) semantics are handled by default +p.subscribe(subscriber1); +p.subscribe(subscriber2); +``` + +In this example, `subscriber1` and `subscriber2` can consume at different rates, `request(n)` will propagate to the provided `onBackpressureStrategy`, not the original `Subject` which can't propagate `request(n)` upstream. -- buffering, sampling, throttling, windowing, dropping, etc -- temporal and count-based strategies -- It does not support pull-based consumer-driven flow control. ##### Disposable A type representing work that can be cancelled or disposed. -### Behavior +Examples: -##### Creation +- An `Observable.subscribe` passes a `Disposable` to the `Observable.onSubscribe` to allow the `Observer` to dispose of the subscription. +- A `Scheduler` returns a `Disposable` that you use for disposing of the `Scheduler`. -Creation of a stream falls into the following use cases, all of which should be catered to in API design. +`Subscription` is a similar type used for lifecycle management on the `Flowable` type with interactive flow control. -- async, hot, push (ie. system or user events) -- async, cold, push (ie. events resulting from remote system via network connection) -- sync, cold, pull (ie. iterable, file, range) -- async, cold, pull (ie. RPC/REST network call, cross-thread queue draining) - -Unknown: +##### Operator -- hot, pull (what is an example of this?) +An operator follows a specific lifecycle (union of the producer/consumer contract). -Flow control support: +- It must propagate the `subscribe` event upstream (to the producer). +- It must obey the RxJava contract (serialize all events, `onError`/`onComplete` are terminal). +- If it has resources to cleanup it is responsible for watching `onError`, `onComplete`, and `cancel/dispose`, and doing the necessary cleanup. +- It must propagate the `cancel/dispose` upstream. + +In the addition of the previous rules, an operator for `Flowable`: + +- It must propagate/negotiate the `request(n)` event. + + +### Creation + +Creation of the various types should be exposed through factory methods that provide safe construction. + +```java +Flowable.create(SyncGenerator generator) + +Flowable.create(AsyncGenerator generator) + +Observable.create(OnSubscribe> onSubscribe) + +Single.create(OnSubscribe> onSubscribe) + +Completable.create(OnSubscribe> onSubscribe) +``` + +### Terminal behavior + +A producer can terminate a stream by emitting `onComplete` or `onError`. A consumer can terminate a stream by calling `cancel`/`dispose`. + +Any resource cleanup of the source or operators must account for any of these three termination events. In other words, if an operator needs cleanup, then it should register the cleanup callback with `cancel`/`dispose`, `onError` and `onComplete`. + +The final `subscribe` will *not* invoke `cancel`/`dispose` after receiving an `onComplete` or `onError`. + +### JVM target and source compatibility + +The 2.x version will target JDK6+ but the source will be using some of the Java 8 features. +We'll use the retrolambda project to generate JDK6 Byte-code from the Java 8 sources. +The only Java 8 features we intend to use are: lambda expressions, method references and try-with-resources statements. +Default methods may be considered if retrolambda support reach an acceptable level of maturity. + +It still up to discussion to know how we will generate multiple targets? +Options are: + +- two artifcat ids: rxjava-jdk6, rxjava-jdk8 +- classifiers: jdk6, jdk8 -- If `request(n)` behavior is supported in the stream implementation, then: - - pull-based creation must support `request(n)` semantics - - push-based creation must provide a default *onBackpressure* strategy -- If `request(n)` behavior is not supported in the stream implementation, then: - - push-based creation can push without consideration of a backpressure strategy - - pull-based creation should be discouraged +The intent is to let Android users consume the new version of RxJava. +(Not sure if JDK6 is the appropriate requirement, JDK7+ is maybe sufficient). -##### Destruction +##### Custom Observable, Single, Completable, or Flowable (To be confirmed) -A producer can terminate a stream by emitting `onComplete` or `onError`. A consumer can terminate a stream by calling `cancel`. +An implementation of an Observable which can be subscribed to with an `Observer`. Custom (`Observable`|`Single`|`Completable`|`Flowable`) classes would implement the interface `Consumable` where the generic type `S` is a `Flowable.Subscriber`, `Observable.Observer`, `Completable.Subscriber`, or a `Single.Subscriber` depending on which semantics the custom class will follow. This choice would also affect the kinds of observables the custom class could interop with. For instance, `Flowable#merge` could operate over the standard `Flowable` or any custom class which implements `Consumable>`. -Any resource cleanup of the source or operators must account for any of these three termination events. In other words, if an operator needs cleanup, then it should register the cleanup callback with `cancel`, `onError` and `onComplete`. +For more information see the proof of concept project [Consumable](https://github.com/stealthcode/Consumable). -The final `subscribe` will *not* invoke `cancel` after receiving an `onComplete` or `onError`. +### Fusion (To be confirmed) +We intend to enable operator fusion, but we don't have any specification yet. Nothing we do here should prevent the implementation of fusion. From ac5281c0dee1e0bdd8fe79644860cfc7c7e16bfc Mon Sep 17 00:00:00 2001 From: Aaron Tull Date: Mon, 28 Mar 2016 15:31:59 -0700 Subject: [PATCH 2/2] Changed the wording of the Consumable and Fusion sections --- DESIGN.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index aad6e50e33..cd1792691d 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -370,12 +370,14 @@ Options are: The intent is to let Android users consume the new version of RxJava. (Not sure if JDK6 is the appropriate requirement, JDK7+ is maybe sufficient). -##### Custom Observable, Single, Completable, or Flowable (To be confirmed) +### Future work -An implementation of an Observable which can be subscribed to with an `Observer`. Custom (`Observable`|`Single`|`Completable`|`Flowable`) classes would implement the interface `Consumable` where the generic type `S` is a `Flowable.Subscriber`, `Observable.Observer`, `Completable.Subscriber`, or a `Single.Subscriber` depending on which semantics the custom class will follow. This choice would also affect the kinds of observables the custom class could interop with. For instance, `Flowable#merge` could operate over the standard `Flowable` or any custom class which implements `Consumable>`. +This section contains current design work which needs more discussion and elaboration before it is merged into this document as a stated goal for 2.x. Our goal is -For more information see the proof of concept project [Consumable](https://github.com/stealthcode/Consumable). +#### Custom Observable, Single, Completable, or Flowable -### Fusion (To be confirmed) +We are investigate a base interface (similar to `Publisher`) for the `Observable`, `Single`, and `Completable` (currently referred to as `Consumable` or `ConsumableObservable`). This would empower library owners and api developers to implement their own type of `Observable`, `Single`, or `Completable` without extending the class. This would result in a change the type signatures of `subscribe` as well as any operator that operates over an `Observable`, `Single`, or `Completable` to accept a more generic type (i.e. `ConsumableObservable`). For more information see the proof of concept project [Consumable](https://github.com/stealthcode/Consumable). + +#### Fusion (To be confirmed) We intend to enable operator fusion, but we don't have any specification yet. Nothing we do here should prevent the implementation of fusion.