-
Notifications
You must be signed in to change notification settings - Fork 782
Description
Bug
Which library version?
6.1.0, 7.0.0-preview.1
What are the platform(s), environment(s) and related component version(s)?
I've tested on Linux x64 and arm64
What is the use case or problem?
There appears to be race condition within RefCount implementation when using it like this:
src.Multicast<StatelessSubject>().RefCount().ObserveOn(TaskPoolScheduler.Default).Retry()When error occurs, retry attempts will race with each other and potentially lead to a state when subscriptions exist but connection to source observable is not recreated. More complete example:
// Simulate polling some remote device
var src = Observable.Create<Int32>(async (observer, ct) =>
{
Console.WriteLine("START");
Int32 i = 0;
while (!ct.IsCancellationRequested)
{
Console.WriteLine(++i);
observer.OnNext(i);
await Task.Delay(500);
// Simulate connection failure
if (i > 5)
throw new InvalidOperationException();
}
})
.SubscribeOn(TaskPoolScheduler.Default);
src = src.Multicast(new StatelessSubject<Int32>())
.RefCount();
const Int32 parallelCount = 12;
for (Int32 i = 0; i < parallelCount; i++)
{
Int32 i1 = i;
src
.ObserveOn(TaskPoolScheduler.Default)
.Do(_ => {},
ex => { Console.WriteLine($"Error {i1}"); }, // Simulate handling error in downstream consumer
() => Console.WriteLine("Completed"))
.Retry() // This retry can't just be moved before .Multicast since downstream consumers might need to know when errors occur (to reset it's own internal business logic, etc)
.Subscribe();
}
Thread.Sleep(-1);
/// "Dumb" subject that doesn't remember error state that I've taken from https://stackoverflow.com/a/64991229
public class StatelessSubject<T> : ISubject<T>, ISubject<T, T>, IObserver<T>, IObservable<T>
{
private IImmutableList<IObserver<T>> _observers = ImmutableArray<IObserver<T>>.Empty;
public IDisposable Subscribe(IObserver<T> observer)
{
ImmutableInterlocked.Update(ref _observers, x => x.Add(observer));
return Disposable.Create(() => ImmutableInterlocked.Update(ref _observers, (Func<IImmutableList<IObserver<T>>, IImmutableList<IObserver<T>>>)(x => x.Remove(observer))));
}
public void OnNext(T value)
{
foreach (var observer in Volatile.Read(ref _observers))
observer.OnNext(value);
}
public void OnError(Exception error)
{
foreach (var observer in Interlocked.Exchange(ref _observers, []))
observer.OnError(error);
}
public void OnCompleted()
{
foreach (var observer in Interlocked.Exchange(ref _observers, []))
observer.OnCompleted();
}
}With large enough parallelCount it will stuck after a few iterations.
What is the expected outcome?
When error occurs in the source, I'd expect RefCount to atomically reset it's state (clear internal subscription to source and all observers), propagate error to "old" observers and be ready to immediately accept subscriptions from new observers, restarting source subscription anew.
What is the actual outcome?
RefCount operator entering incorrect state, when it has active downstream subscriptions, but doesn't create upstream subscription.