From 7425dbd2388d84c665c3b2463ddc1c24a7f75107 Mon Sep 17 00:00:00 2001 From: Luke Pighetti Date: Thu, 26 Sep 2019 08:44:33 -0400 Subject: [PATCH 1/3] add ValueObservable.combineLatestN + tests --- lib/src/observables/value_observable.dart | 324 ++++++++++++++++++++++ test/streams/combine_latest_test.dart | 217 +++++++++++++++ 2 files changed, 541 insertions(+) diff --git a/lib/src/observables/value_observable.dart b/lib/src/observables/value_observable.dart index 70d824573..b02f96e29 100644 --- a/lib/src/observables/value_observable.dart +++ b/lib/src/observables/value_observable.dart @@ -1,4 +1,7 @@ +import 'dart:async'; + import 'package:rxdart/src/observables/observable.dart'; +import 'package:rxdart/streams.dart'; /// An [Observable] that provides synchronous access to the last emitted item abstract class ValueObservable implements Observable { @@ -7,4 +10,325 @@ abstract class ValueObservable implements Observable { T get value; bool get hasValue; + + /// Merges the given ValueObservables into one ValueObservable sequence by using the + /// [combiner] function whenever any of the observable sequences emits an + /// item. + /// + /// The new ValueObservable will be seeded based on the initial value of each input ValueObservable. + /// + /// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) + /// + /// ### Example + /// + /// final foo = ValueObservable.combineLatest3( + /// new Observable.just("a").shareValueSeeded("a"), + /// new Observable.fromIterable(["b", "b"]).shareValueSeeded("b"), + /// (a, b) => a + b); + /// + /// print(foo.value); // prints "abc" + /// foo.listen(print); //prints "abc", "abc" + static ValueObservable combineLatest2(ValueObservable streamA, + ValueObservable streamB, T combiner(A a, B b)) => + Observable(CombineLatestStream.combine2( + streamA.skip(1), streamB.skip(1), combiner)) + .shareValueSeeded(combiner(streamA.value, streamB.value)); + + /// Merges the given ValueObservables into one ValueObservable sequence by using the + /// [combiner] function whenever any of the observable sequences emits an + /// item. + /// + /// The new ValueObservable will be seeded based on the initial value of each input ValueObservable. + /// + /// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) + /// + /// ### Example + /// + /// final foo = ValueObservable.combineLatest3( + /// new Observable.just("a").shareValueSeeded("a"), + /// new Observable.just("b").shareValueSeeded("b"), + /// new Observable.fromIterable(["c", "c"]).shareValueSeeded("c"), + /// (a, b, c) => a + b + c); + /// + /// print(foo.value); // prints "abc" + /// foo.listen(print); //prints "abc", "abc" + static ValueObservable combineLatest3( + ValueObservable streamA, + ValueObservable streamB, + ValueObservable streamC, + T combiner(A a, B b, C c)) => + Observable(CombineLatestStream.combine3( + streamA.skip(1), streamB.skip(1), streamC.skip(1), combiner)) + .shareValueSeeded( + combiner(streamA.value, streamB.value, streamC.value)); + + /// Merges the given ValueObservables into one ValueObservable sequence by using the + /// [combiner] function whenever any of the observable sequences emits an + /// item. + /// + /// The new ValueObservable will be seeded based on the initial value of each input ValueObservable. + /// + /// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) + /// + /// ### Example + /// + /// final foo = ValueObservable.combineLatest3( + /// new Observable.just("a").shareValueSeeded("a"), + /// new Observable.just("b").shareValueSeeded("b"), + /// new Observable.just("c").shareValueSeeded("c"), + /// new Observable.fromIterable(["d", "d"]).shareValueSeeded("d"), + /// (a, b, c, d) => a + b + c + d); + /// + /// print(foo.value); // prints "abcd" + /// foo.listen(print); //prints "abcd", "abcd" + static ValueObservable combineLatest4( + ValueObservable streamA, + ValueObservable streamB, + ValueObservable streamC, + ValueObservable streamD, + T combiner(A a, B b, C c, D d)) => + Observable(CombineLatestStream.combine4(streamA.skip(1), + streamB.skip(1), streamC.skip(1), streamD.skip(1), combiner)) + .shareValueSeeded(combiner( + streamA.value, streamB.value, streamC.value, streamD.value)); + + /// Merges the given ValueObservables into one ValueObservable sequence by using the + /// [combiner] function whenever any of the observable sequences emits an + /// item. + /// + /// The new ValueObservable will be seeded based on the initial value of each input ValueObservable. + /// + /// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) + /// + /// ### Example + /// + /// final foo = ValueObservable.combineLatest3( + /// new Observable.just("a").shareValueSeeded("a"), + /// new Observable.just("b").shareValueSeeded("b"), + /// new Observable.just("c").shareValueSeeded("c"), + /// new Observable.just("d").shareValueSeeded("d"), + /// new Observable.fromIterable(["e", "e"]).shareValueSeeded("e"), + /// (a, b, c, d, e) => a + b + c + d + e); + /// + /// print(foo.value); // prints "abcde" + /// foo.listen(print); //prints "abcde", "abcde" + static ValueObservable combineLatest5( + ValueObservable streamA, + ValueObservable streamB, + ValueObservable streamC, + ValueObservable streamD, + ValueObservable streamE, + T combiner(A a, B b, C c, D d, E e)) => + Observable(CombineLatestStream.combine5( + streamA.skip(1), + streamB.skip(1), + streamC.skip(1), + streamD.skip(1), + streamE.skip(1), + combiner)) + .shareValueSeeded(combiner(streamA.value, streamB.value, + streamC.value, streamD.value, streamE.value)); + + /// Merges the given ValueObservables into one ValueObservable sequence by using the + /// [combiner] function whenever any of the observable sequences emits an + /// item. + /// + /// The new ValueObservable will be seeded based on the initial value of each input ValueObservable. + /// + /// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) + /// + /// ### Example + /// + /// final foo = ValueObservable.combineLatest3( + /// new Observable.just("a").shareValueSeeded("a"), + /// new Observable.just("b").shareValueSeeded("b"), + /// new Observable.just("c").shareValueSeeded("c"), + /// new Observable.just("d").shareValueSeeded("d"), + /// new Observable.just("e").shareValueSeeded("e"), + /// new Observable.fromIterable(["f", "f"]).shareValueSeeded("f"), + /// (a, b, c, d, e, f) => a + b + c + d + e + f); + /// + /// print(foo.value); // prints "abcdef" + /// foo.listen(print); //prints "abcdef", "abcdef" + static ValueObservable combineLatest6( + ValueObservable streamA, + ValueObservable streamB, + ValueObservable streamC, + ValueObservable streamD, + ValueObservable streamE, + ValueObservable streamF, + T combiner(A a, B b, C c, D d, E e, F f)) => + Observable(CombineLatestStream.combine6( + streamA.skip(1), + streamB.skip(1), + streamC.skip(1), + streamD.skip(1), + streamE.skip(1), + streamF.skip(1), + combiner)) + .shareValueSeeded(combiner(streamA.value, streamB.value, + streamC.value, streamD.value, streamE.value, streamF.value)); + + /// Merges the given ValueObservables into one ValueObservable sequence by using the + /// [combiner] function whenever any of the observable sequences emits an + /// item. + /// + /// The new ValueObservable will be seeded based on the initial value of each input ValueObservable. + /// + /// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) + /// + /// ### Example + /// + /// final foo = ValueObservable.combineLatest3( + /// new Observable.just("a").shareValueSeeded("a"), + /// new Observable.just("b").shareValueSeeded("b"), + /// new Observable.just("c").shareValueSeeded("c"), + /// new Observable.just("d").shareValueSeeded("d"), + /// new Observable.just("e").shareValueSeeded("e"), + /// new Observable.just("f").shareValueSeeded("f"), + /// new Observable.fromIterable(["g", "g"]).shareValueSeeded("g"), + /// (a, b, c, d, e, f, g) => a + b + c + d + e + f + g); + /// + /// print(foo.value); // prints "abcdefg" + /// foo.listen(print); //prints "abcdefg", "abcdefg" + static ValueObservable combineLatest7( + ValueObservable streamA, + ValueObservable streamB, + ValueObservable streamC, + ValueObservable streamD, + ValueObservable streamE, + ValueObservable streamF, + ValueObservable streamG, + T combiner(A a, B b, C c, D d, E e, F f, G g)) => + Observable(CombineLatestStream.combine7( + streamA.skip(1), + streamB.skip(1), + streamC.skip(1), + streamD.skip(1), + streamE.skip(1), + streamF.skip(1), + streamG.skip(1), + combiner)) + .shareValueSeeded(combiner( + streamA.value, + streamB.value, + streamC.value, + streamD.value, + streamE.value, + streamF.value, + streamG.value)); + + /// Merges the given ValueObservables into one ValueObservable sequence by using the + /// [combiner] function whenever any of the observable sequences emits an + /// item. + /// + /// The new ValueObservable will be seeded based on the initial value of each input ValueObservable. + /// + /// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) + /// + /// ### Example + /// + /// final foo = ValueObservable.combineLatest3( + /// new Observable.just("a").shareValueSeeded("a"), + /// new Observable.just("b").shareValueSeeded("b"), + /// new Observable.just("c").shareValueSeeded("c"), + /// new Observable.just("d").shareValueSeeded("d"), + /// new Observable.just("e").shareValueSeeded("e"), + /// new Observable.just("f").shareValueSeeded("f"), + /// new Observable.just("g").shareValueSeeded("g"), + /// new Observable.fromIterable(["h", "h"]).shareValueSeeded("h"), + /// (a, b, c, d, e, f, g, h) => a + b + c + d + e + f + g + h); + /// + /// print(foo.value); // prints "abcdefgh" + /// foo.listen(print); //prints "abcdefgh", "abcdefgh" + static ValueObservable combineLatest8( + ValueObservable streamA, + ValueObservable streamB, + ValueObservable streamC, + ValueObservable streamD, + ValueObservable streamE, + ValueObservable streamF, + ValueObservable streamG, + ValueObservable streamH, + T combiner(A a, B b, C c, D d, E e, F f, G g, H h)) => + Observable( + CombineLatestStream.combine8( + streamA.skip(1), + streamB.skip(1), + streamC.skip(1), + streamD.skip(1), + streamE.skip(1), + streamF.skip(1), + streamG.skip(1), + streamH.skip(1), + combiner, + ), + ).shareValueSeeded(combiner( + streamA.value, + streamB.value, + streamC.value, + streamD.value, + streamE.value, + streamF.value, + streamG.value, + streamH.value)); + + /// Merges the given ValueObservables into one ValueObservable sequence by using the + /// [combiner] function whenever any of the observable sequences emits an + /// item. + /// + /// The new ValueObservable will be seeded based on the initial value of each input ValueObservable. + /// + /// [Interactive marble diagram](http://rxmarbles.com/#combineLatest) + /// + /// ### Example + /// + /// final foo = ValueObservable.combineLatest3( + /// new Observable.just("a").shareValueSeeded("a"), + /// new Observable.just("b").shareValueSeeded("b"), + /// new Observable.just("c").shareValueSeeded("c"), + /// new Observable.just("d").shareValueSeeded("d"), + /// new Observable.just("e").shareValueSeeded("e"), + /// new Observable.just("f").shareValueSeeded("f"), + /// new Observable.just("g").shareValueSeeded("g"), + /// new Observable.just("h").shareValueSeeded("h"), + /// new Observable.fromIterable(["i", "i"]).shareValueSeeded("i"), + /// (a, b, c, d, e, f, g, h, i) => a + b + c + d + e + f + g + h + i); + /// + /// print(foo.value); // prints "abcdefghi" + /// foo.listen(print); //prints "abcdefghi", "abcdefghi" + static ValueObservable combineLatest9( + ValueObservable streamA, + ValueObservable streamB, + ValueObservable streamC, + ValueObservable streamD, + ValueObservable streamE, + ValueObservable streamF, + ValueObservable streamG, + ValueObservable streamH, + ValueObservable streamI, + T combiner(A a, B b, C c, D d, E e, F f, G g, H h, I i)) => + Observable( + CombineLatestStream.combine9( + streamA.skip(1), + streamB.skip(1), + streamC.skip(1), + streamD.skip(1), + streamE.skip(1), + streamF.skip(1), + streamG.skip(1), + streamH.skip(1), + streamI.skip(1), + combiner, + ), + ).shareValueSeeded(combiner( + streamA.value, + streamB.value, + streamC.value, + streamD.value, + streamE.value, + streamF.value, + streamG.value, + streamH.value, + streamI.value)); } diff --git a/test/streams/combine_latest_test.dart b/test/streams/combine_latest_test.dart index d5943ffbc..ba9b35728 100644 --- a/test/streams/combine_latest_test.dart +++ b/test/streams/combine_latest_test.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:rxdart/rxdart.dart'; +import 'package:rxdart/src/observables/value_observable.dart'; import 'package:test/test.dart'; Stream get streamA => @@ -366,4 +367,220 @@ void main() { subscription.pause(Future.delayed(const Duration(milliseconds: 80))); }); + + /// ValueObservable + + test('rx.ValueObservable.combineLatest2', () async { + const expected = [1, 2]; + + var a = Observable.just(1).shareValueSeeded(1), + b = Observable.just(2).shareValueSeeded(2); + + final observable = ValueObservable.combineLatest2( + a, b, (int first, int second) => [first, second]); + + expect(observable.value, equals(expected)); + + observable.listen(expectAsync1((result) { + expect(result, expected); + }, count: 2)); + }); + + test('rx.ValueObservable.combineLatest3', () async { + const expected = [1, "2", 3.0]; + + var a = Observable.just(1).shareValueSeeded(1), + b = Observable.just("2").shareValueSeeded("2"), + c = Observable.just(3.0).shareValueSeeded(3.0); + + final observable = ValueObservable.combineLatest3(a, b, c, + (int first, String second, double third) => [first, second, third]); + + expect(observable.value, equals(expected)); + + observable.listen(expectAsync1((result) { + expect(result, expected); + }, count: 2)); + }); + + test('rx.ValueObservable.combineLatest4', () async { + const expected = [1, 2, 3, 4]; + + var a = Observable.just(1).shareValueSeeded(1), + b = Observable.just(2).shareValueSeeded(2), + c = Observable.just(3).shareValueSeeded(3), + d = Observable.just(4).shareValueSeeded(4); + + final observable = ValueObservable.combineLatest4( + a, + b, + c, + d, + (int first, int second, int third, int fourth) => + [first, second, third, fourth]); + + expect(observable.value, equals(expected)); + + observable.listen(expectAsync1((result) { + expect(result, expected); + }, count: 2)); + }); + + test('rx.ValueObservable.combineLatest5', () async { + const expected = [1, 2, 3, 4, 5]; + + var a = Observable.just(1).shareValueSeeded(1), + b = Observable.just(2).shareValueSeeded(2), + c = Observable.just(3).shareValueSeeded(3), + d = Observable.just(4).shareValueSeeded(4), + e = Observable.just(5).shareValueSeeded(5); + + final observable = ValueObservable.combineLatest5( + a, + b, + c, + d, + e, + (int first, int second, int third, int fourth, int fifth) => + [first, second, third, fourth, fifth]); + + expect(observable.value, equals(expected)); + + observable.listen(expectAsync1((result) { + expect(result, expected); + }, count: 2)); + }); + + test('rx.ValueObservable.combineLatest6', () async { + const expected = [1, 2, 3, 4, 5, 6]; + + var a = Observable.just(1).shareValueSeeded(1), + b = Observable.just(2).shareValueSeeded(2), + c = Observable.just(3).shareValueSeeded(3), + d = Observable.just(4).shareValueSeeded(4), + e = Observable.just(5).shareValueSeeded(5), + f = Observable.just(6).shareValueSeeded(6); + + final observable = ValueObservable.combineLatest6( + a, + b, + c, + d, + e, + f, + (int first, int second, int third, int fourth, int fifth, int sixth) => + [first, second, third, fourth, fifth, sixth]); + + expect(observable.value, equals(expected)); + + observable.listen(expectAsync1((result) { + expect(result, expected); + }, count: 2)); + }); + + test('rx.ValueObservable.combineLatest7', () async { + const expected = [1, 2, 3, 4, 5, 6, 7]; + + var a = Observable.just(1).shareValueSeeded(1), + b = Observable.just(2).shareValueSeeded(2), + c = Observable.just(3).shareValueSeeded(3), + d = Observable.just(4).shareValueSeeded(4), + e = Observable.just(5).shareValueSeeded(5), + f = Observable.just(6).shareValueSeeded(6), + g = Observable.just(7).shareValueSeeded(7); + + final observable = ValueObservable.combineLatest7( + a, + b, + c, + d, + e, + f, + g, + (int first, int second, int third, int fourth, int fifth, int sixth, + int seventh) => + [first, second, third, fourth, fifth, sixth, seventh]); + + expect(observable.value, equals(expected)); + + observable.listen(expectAsync1((result) { + expect(result, expected); + }, count: 2)); + }); + + test('rx.ValueObservable.combineLatest8', () async { + const expected = [1, 2, 3, 4, 5, 6, 7, 8]; + + var a = Observable.just(1).shareValueSeeded(1), + b = Observable.just(2).shareValueSeeded(2), + c = Observable.just(3).shareValueSeeded(3), + d = Observable.just(4).shareValueSeeded(4), + e = Observable.just(5).shareValueSeeded(5), + f = Observable.just(6).shareValueSeeded(6), + g = Observable.just(7).shareValueSeeded(7), + h = Observable.just(8).shareValueSeeded(8); + + final observable = ValueObservable.combineLatest8( + a, + b, + c, + d, + e, + f, + g, + h, + (int first, int second, int third, int fourth, int fifth, int sixth, + int seventh, int eighth) => + [first, second, third, fourth, fifth, sixth, seventh, eighth]); + + expect(observable.value, equals(expected)); + + observable.listen(expectAsync1((result) { + expect(result, expected); + }, count: 2)); + }); + + test('rx.ValueObservable.combineLatest9', () async { + const expected = [1, 2, 3, 4, 5, 6, 7, 8, 9]; + + var a = Observable.just(1).shareValueSeeded(1), + b = Observable.just(2).shareValueSeeded(2), + c = Observable.just(3).shareValueSeeded(3), + d = Observable.just(4).shareValueSeeded(4), + e = Observable.just(5).shareValueSeeded(5), + f = Observable.just(6).shareValueSeeded(6), + g = Observable.just(7).shareValueSeeded(7), + h = Observable.just(8).shareValueSeeded(8), + i = Observable.just(9).shareValueSeeded(9); + + final observable = ValueObservable.combineLatest9( + a, + b, + c, + d, + e, + f, + g, + h, + i, + (int first, int second, int third, int fourth, int fifth, int sixth, + int seventh, int eighth, int ninth) => + [ + first, + second, + third, + fourth, + fifth, + sixth, + seventh, + eighth, + ninth + ]); + + expect(observable.value, equals(expected)); + + observable.listen(expectAsync1((result) { + expect(result, expected); + }, count: 2)); + }); } From 1c96b9fea7901caa994bd9845142f5846b0c96b7 Mon Sep 17 00:00:00 2001 From: Luke Pighetti Date: Thu, 26 Sep 2019 08:48:14 -0400 Subject: [PATCH 2/3] remove unecessary import --- test/streams/combine_latest_test.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/test/streams/combine_latest_test.dart b/test/streams/combine_latest_test.dart index ba9b35728..307c825a5 100644 --- a/test/streams/combine_latest_test.dart +++ b/test/streams/combine_latest_test.dart @@ -1,7 +1,6 @@ import 'dart:async'; import 'package:rxdart/rxdart.dart'; -import 'package:rxdart/src/observables/value_observable.dart'; import 'package:test/test.dart'; Stream get streamA => From 33f94dc3b4f31dd79ad107691eefef3fbb4d4e95 Mon Sep 17 00:00:00 2001 From: Luke Pighetti Date: Thu, 26 Sep 2019 08:53:06 -0400 Subject: [PATCH 3/3] more unecessary imports --- lib/src/observables/value_observable.dart | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/src/observables/value_observable.dart b/lib/src/observables/value_observable.dart index b02f96e29..1e8ef6334 100644 --- a/lib/src/observables/value_observable.dart +++ b/lib/src/observables/value_observable.dart @@ -1,5 +1,3 @@ -import 'dart:async'; - import 'package:rxdart/src/observables/observable.dart'; import 'package:rxdart/streams.dart';