Skip to content

Commit 4a801c6

Browse files
akarnokddanielcweber
authored andcommitted
Have TakeUntil(time) use lock-free methods. (#611)
1 parent ad99fb5 commit 4a801c6

File tree

1 file changed

+14
-31
lines changed
  • Rx.NET/Source/src/System.Reactive/Linq/Observable

1 file changed

+14
-31
lines changed

Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ public TakeUntil(IObservable<TSource> source, IObservable<TOther> other)
2525

2626
internal sealed class _ : IdentitySink<TSource>
2727
{
28-
private IDisposable _mainDisposable;
2928
private IDisposable _otherDisposable;
3029
private int _halfSerializer;
3130
private Exception _error;
@@ -38,16 +37,15 @@ public _(IObserver<TSource> observer)
3837
public void Run(TakeUntil<TSource, TOther> parent)
3938
{
4039
Disposable.SetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));
41-
Disposable.SetSingle(ref _mainDisposable, parent._source.Subscribe(this));
40+
base.Run(parent._source);
4241
}
4342

4443
protected override void Dispose(bool disposing)
4544
{
4645
if (disposing)
4746
{
48-
if (!Disposable.GetIsDisposed(ref _mainDisposable))
47+
if (!Disposable.GetIsDisposed(ref _otherDisposable))
4948
{
50-
Disposable.TryDispose(ref _mainDisposable);
5149
Disposable.TryDispose(ref _otherDisposable);
5250
}
5351
}
@@ -99,11 +97,6 @@ public void OnNext(TOther value)
9997
}
10098
}
10199

102-
internal static class TakeUntilTerminalException
103-
{
104-
internal static readonly Exception Instance = new Exception("No further exceptions");
105-
}
106-
107100
internal sealed class TakeUntil<TSource> : Producer<TSource, TakeUntil<TSource>._>
108101
{
109102
private readonly IObservable<TSource> _source;
@@ -140,61 +133,51 @@ public IObservable<TSource> Combine(DateTimeOffset endTime)
140133

141134
internal sealed class _ : IdentitySink<TSource>
142135
{
143-
private readonly object _gate = new object();
136+
private IDisposable _timerDisposable;
137+
138+
private int _wip;
139+
140+
private Exception _error;
144141

145142
public _(IObserver<TSource> observer)
146143
: base(observer)
147144
{
148145
}
149146

150-
private IDisposable _sourceDisposable;
151-
152147
public void Run(TakeUntil<TSource> parent)
153148
{
154-
SetUpstream(parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick()));
155-
Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
149+
Disposable.SetSingle(ref _timerDisposable, parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick()));
150+
base.Run(parent._source);
156151
}
157152

158153
protected override void Dispose(bool disposing)
159154
{
160155
if (disposing)
161156
{
162-
Disposable.TryDispose(ref _sourceDisposable);
157+
Disposable.TryDispose(ref _timerDisposable);
163158
}
164159
base.Dispose(disposing);
165160
}
166161

167162
private IDisposable Tick()
168163
{
169-
lock (_gate)
170-
{
171-
ForwardOnCompleted();
172-
}
164+
OnCompleted();
173165
return Disposable.Empty;
174166
}
175167

176168
public override void OnNext(TSource value)
177169
{
178-
lock (_gate)
179-
{
180-
ForwardOnNext(value);
181-
}
170+
HalfSerializer.ForwardOnNext(this, value, ref _wip, ref _error);
182171
}
183172

184173
public override void OnError(Exception error)
185174
{
186-
lock (_gate)
187-
{
188-
ForwardOnError(error);
189-
}
175+
HalfSerializer.ForwardOnError(this, error, ref _wip, ref _error);
190176
}
191177

192178
public override void OnCompleted()
193179
{
194-
lock (_gate)
195-
{
196-
ForwardOnCompleted();
197-
}
180+
HalfSerializer.ForwardOnCompleted(this, ref _wip, ref _error);
198181
}
199182
}
200183
}

0 commit comments

Comments
 (0)