Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ internal interface IQueryLanguage
IObservable<TSource> SkipUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources);
IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate);
IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector);
IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector);
IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries);
Expand Down
29 changes: 29 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Multiple.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,35 @@ public static IObservable<TSource> TakeUntil<TSource, TOther>(this IObservable<T
return s_impl.TakeUntil<TSource, TOther>(source, other);
}

/// <summary>
/// Relays elements from the source observable sequence and calls the predicate after an
/// emission to check if the sequence should stop after that specific item.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
/// <param name="source">The source sequence to relay elements of.</param>
/// <param name="stopPredicate">Called after each upstream item has been emitted with
/// that upstream item and should return <code>true</code> to indicate the sequence should
/// complete.</param>
/// <returns>The observable sequence with the source elements until the stop predicate returns true.</returns>
/// <example>
/// The following sequence will stop after the value 5 has been encountered:
/// <code>
/// Observable.Range(1, 10)
/// .TakeUntil(item =&gt; item == 5)
/// .Subscribe(Console.WriteLine);
/// </code>
/// </example>
/// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> stopPredicate)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (stopPredicate == null)
throw new ArgumentNullException(nameof(stopPredicate));

return s_impl.TakeUntil(source, stopPredicate);
}

#endregion

#region + Window +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;

namespace System.Reactive.Linq.ObservableImpl
{
/// <summary>
/// Relays items to the downstream until the predicate returns <code>true</code>.
/// </summary>
/// <typeparam name="TSource">The element type of the sequence</typeparam>
internal sealed class TakeUntilPredicate<TSource> :
Producer<TSource, TakeUntilPredicate<TSource>.TakeUntilPredicateObserver>
{
readonly IObservable<TSource> _source;

readonly Func<TSource, bool> _stopPredicate;

public TakeUntilPredicate(IObservable<TSource> source, Func<TSource, bool> stopPredicate)
{
this._source = source;
this._stopPredicate = stopPredicate;
}

protected override TakeUntilPredicateObserver CreateSink(IObserver<TSource> observer) => new TakeUntilPredicateObserver(observer, _stopPredicate);

protected override void Run(TakeUntilPredicateObserver sink) => sink.Run(_source);

internal sealed class TakeUntilPredicateObserver : IdentitySink<TSource>
{
readonly Func<TSource, bool> _stopPredicate;

public TakeUntilPredicateObserver(IObserver<TSource> downstream,
Func<TSource, bool> predicate) : base (downstream)
{
this._stopPredicate = predicate;
}

public override void OnCompleted()
{
ForwardOnCompleted();
}

public override void OnError(Exception error)
{
ForwardOnError(error);
}

public override void OnNext(TSource value)
{
ForwardOnNext(value);

var shouldStop = false;
try
{
shouldStop = _stopPredicate(value);
}
catch (Exception ex)
{
ForwardOnError(ex);
return;
}
if (shouldStop)
{
ForwardOnCompleted();
}
}
}
}
}
40 changes: 40 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15015,6 +15015,46 @@ public static IQbservable<TSource> TakeUntil<TSource, TOther>(this IQbservable<T
);
}

/// <summary>
/// Relays elements from the source observable sequence and calls the predicate after an
/// emission to check if the sequence should stop after that specific item.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
/// <param name="source">The source sequence to relay elements of.</param>
/// <param name="stopPredicate">Called after each upstream item has been emitted with
/// that upstream item and should return <code>true</code> to indicate the sequence should
/// complete.</param>
/// <returns>The observable sequence with the source elements until the stop predicate returns true.</returns>
/// <example>
/// The following sequence will stop after the value 5 has been encountered:
/// <code>
/// Observable.Range(1, 10)
/// .TakeUntil(item =&gt; item == 5)
/// .Subscribe(Console.WriteLine);
/// </code>
/// </example>
/// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, bool>> stopPredicate)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (stopPredicate == null)
throw new ArgumentNullException(nameof(stopPredicate));

return source.Provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.TakeUntil<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, bool>>))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
source.Expression,
stopPredicate
)
);
}

/// <summary>
/// Returns elements from an observable sequence as long as a specified condition is true.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ public virtual IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSour
return new TakeUntil<TSource, TOther>(source, other);
}

public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate)
{
return new TakeUntilPredicate<TSource>(source, stopPredicate);
}

#endregion

#region + Window +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,7 @@ namespace System.Reactive.Linq
public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration) { }
public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.IObservable<TSource> TakeUntil<TSource, TOther>(this System.IObservable<TSource> source, System.IObservable<TOther> other) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> stopPredicate) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime) { }
public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.IObservable<TSource> TakeWhile<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> predicate) { }
Expand Down Expand Up @@ -2097,6 +2098,7 @@ namespace System.Reactive.Linq
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource, TOther>(this System.Reactive.Linq.IQbservable<TSource> source, System.IObservable<TOther> other) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> stopPredicate) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> predicate) { }
public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, int, bool>> predicate) { }
public static System.Reactive.Joins.QueryablePlan<TResult> Then<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, TResult>> selector) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,5 +685,182 @@ public void TakeUntil_Default()

#endregion

#region + Predicate +

[Fact]
public void TakeUntil_Predicate_ArgumentChecking()
{
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil<int>(null, v => true));
ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil<int>(DummyObservable<int>.Instance, null));
}

[Fact]
public void TakeUntil_Predicate_Basic()
{
var scheduler = new TestScheduler();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(40, 4),
OnNext(50, 5),
OnNext(60, 6),
OnNext(70, 7),
OnNext(80, 8),
OnNext(90, 9),
OnCompleted<int>(100)
);

var result = scheduler.Start(() => source.TakeUntil(v => v == 5));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnNext(240, 4),
OnNext(250, 5),
OnCompleted<int>(250)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 250)
);
}

[Fact]
public void TakeUntil_Predicate_True()
{
var scheduler = new TestScheduler();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(40, 4),
OnNext(50, 5),
OnNext(60, 6),
OnNext(70, 7),
OnNext(80, 8),
OnNext(90, 9),
OnCompleted<int>(100)
);

var result = scheduler.Start(() => source.TakeUntil(v => true));

result.Messages.AssertEqual(
OnNext(210, 1),
OnCompleted<int>(210)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 210)
);
}

[Fact]
public void TakeUntil_Predicate_False()
{
var scheduler = new TestScheduler();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(40, 4),
OnNext(50, 5),
OnNext(60, 6),
OnNext(70, 7),
OnNext(80, 8),
OnNext(90, 9),
OnCompleted<int>(100)
);

var result = scheduler.Start(() => source.TakeUntil(v => false));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnNext(240, 4),
OnNext(250, 5),
OnNext(260, 6),
OnNext(270, 7),
OnNext(280, 8),
OnNext(290, 9),
OnCompleted<int>(300)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 300)
);
}

[Fact]
public void TakeUntil_Predicate_Error()
{
var scheduler = new TestScheduler();

var ex = new InvalidOperationException();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnError<int>(40, ex)
);

var result = scheduler.Start(() => source.TakeUntil(v => false));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnError<int>(240, ex)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 240)
);
}

[Fact]
public void TakeUntil_Predicate_Crash()
{
var scheduler = new TestScheduler();

var ex = new InvalidOperationException();

var source = scheduler.CreateColdObservable(
OnNext(10, 1),
OnNext(20, 2),
OnNext(30, 3),
OnNext(240, 4),
OnNext(250, 5),
OnCompleted<int>(260)
);

var result = scheduler.Start(() => source.TakeUntil(v => {
if (v == 3)
{
throw ex;
}
return false;
}));

result.Messages.AssertEqual(
OnNext(210, 1),
OnNext(220, 2),
OnNext(230, 3),
OnError<int>(230, ex)
);

source.Subscriptions.AssertEqual(
Subscribe(200, 230)
);
}

#endregion

}
}