Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 9 additions & 24 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ public void OnNext(TOther value)
}
}

internal static class TakeUntilTerminalException
{
internal static readonly Exception Instance = new Exception("No further exceptions");
}

internal sealed class TakeUntil<TSource> : Producer<TSource, TakeUntil<TSource>._>
{
private readonly IObservable<TSource> _source;
Expand Down Expand Up @@ -140,15 +135,17 @@ public IObservable<TSource> Combine(DateTimeOffset endTime)

internal sealed class _ : IdentitySink<TSource>
{
private readonly object _gate = new object();
private IDisposable _sourceDisposable;

private int _wip;

private Exception _error;

public _(IObserver<TSource> observer)
: base(observer)
{
}

private IDisposable _sourceDisposable;

public void Run(TakeUntil<TSource> parent)
{
SetUpstream(parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the upstream for source subscription, and use the local disposable for the timer? Just to be more in line with the other operators.

Expand All @@ -166,35 +163,23 @@ protected override void Dispose(bool disposing)

private IDisposable Tick()
{
lock (_gate)
{
ForwardOnCompleted();
}
OnCompleted();
return Disposable.Empty;
}

public override void OnNext(TSource value)
{
lock (_gate)
{
ForwardOnNext(value);
}
HalfSerializer.ForwardOnNext(this, value, ref _wip, ref _error);
}

public override void OnError(Exception error)
{
lock (_gate)
{
ForwardOnError(error);
}
HalfSerializer.ForwardOnError(this, error, ref _wip, ref _error);
}

public override void OnCompleted()
{
lock (_gate)
{
ForwardOnCompleted();
}
HalfSerializer.ForwardOnCompleted(this, ref _wip, ref _error);
}
}
}
Expand Down