-
Notifications
You must be signed in to change notification settings - Fork 766
Description
Hello, we believe we have identified a race condition in the Throttle
operator:
Which library version?
Reproduced in 4.1.0 and 5.0.0
What are the platform(s), environment(s) and related component version(s)?
Reproduced with
Windows 10 20H2
.NET 5.0 (with system.reactive 5.0.0)
.NET Framework 4.8 (with system.reactive 4.1.0)
What is the use case or problem?
If a second event arrives at the Throttle(TimeSpan dueTime)
operator at almost exactly the same time as a cached event is due to be propagated, then neither will be emitted. I have reproduced this with Throttle(TimeSpan.Zero)
but believe that it could happen with any timespan if a second or subsequent event arrives at an inopportune time.
A review of the code suggests the following interaction can occur between Throttle._.OnNext and Throttle._.Propagate
public override void OnNext(TSource value)
{
ulong currentid;
lock (_gate)
{
_hasValue = true;
_value = value;
_id = unchecked(_id + 1);
currentid = _id;
}
_serialCancelable.Disposable = null;
_serialCancelable.Disposable = _scheduler.ScheduleAction((@this: this, currentid), _dueTime, static tuple => tuple.@this.Propagate(tuple.currentid));
}
private void Propagate(ulong currentid)
{
lock (_gate)
{
if (_hasValue && _id == currentid)
{
ForwardOnNext(_value!);
}
_hasValue = false;
}
If two items arrive at OnNext
in quick succession:
OnNext(1):
- hasValue = true, value = 1, id = 100 (let's say)
- schedules Propagate(1)
OnNext(2):
- hasValue = true, value = 2, id = 101
- schedules Propagate(2)
Propagate(1):
- hasValue is true but id doesn't match so doesn't forward value
- clears hasValue
Propagate(2):
- id matches but hasValue is false, so doesn't forward value
What is the expected outcome?
An event is emitted if no further events are received within the Throttle dueTime.
What is the actual outcome?
No event is emitted
What is the stacktrace of the exception(s) if any?
N/A
Do you have a code snippet or project that reproduces the problem?
Example .NET 5 console app referencing System.Reactive 5.0.0
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
while (true)
{
var source = Observable.Create<int>(o =>
{
o.OnNext(1);
o.OnNext(2);
return Disposable.Empty;
});
var i = 0;
try
{
while (true)
{
i++;
await source
.Throttle(TimeSpan.Zero)
.Timeout(TimeSpan.FromMilliseconds(100))
.Take(1);
}
}
catch (TimeoutException)
{
Console.WriteLine($"Failed after {i} iterations");
}
}