-
Notifications
You must be signed in to change notification settings - Fork 782
Description
Which library version?
System.Reactive 6.0.0
What are the platform(s), environment(s) and related component version(s)?
dotnet 7.0.203
What is the use case or problem?
We are processing an observable that contains sensor samples taken from many different sensors. We want to gather statistics related to each individual sensor, and so perform a GroupBy() on the observable to create per-sensor observables. There is a long delay between the observable completing and the final subscriber completing during which there is 100% CPU usage.
A simple reproducer has been created based on an observable:
- Constructed from an array of m elements.
- Grouped using GroupBy() into
IObservable<IGroupedObservable<...>>in n groups - Merged together using
Merge()orSelectMany(). - Subscribed.
For example:
void Test(int[] data)
{
data.ToObservable()
.GroupBy(value => value % numberOfGroups)
.SelectMany(groupOfInts => groupOfInts)
.Subscribe(intValue => {});
}or
void Test(int[] data)
{
data.ToObservable()
.GroupBy(value => value % numberOfGroups)
.Merge()
.Subscribe(intValue => {});
}What is the expected outcome?
The time taken to complete is O(m) where m is the number of elements in the array.
What is the actual outcome?
The time taken to complete is O(n^2) where n is the number of groups that were allocated (numberOfGroups from the above example).
What is the stacktrace of the exception(s) if any?
Running under a profiler the problem appears to be that when each of the IGroupedObservables completes, the subscriber created by the SelectMany() or Merge() is individually removed from a CompositeDisposable. This removal results in a linear search of an IList in the CompositeDisposable that contains one entry per group. As all the subscribers are removed, one after the other, this removal process is O(n^2) on the number of groups.
Profiler output:
23.2% <Main>$ • 1,659 ms • Program.<Main>$(String[])
22.7% GroupByMerge • 1,624 ms • RxPerf.RxGroupByMicroBenchmark.GroupByMerge()
22.6% Subscribe • 1,617 ms • System.ObservableExtensions.Subscribe(IObservable, Action)
22.6% Subscribe • 1,617 ms • System.Reactive.Producer`2.Subscribe(IObserver)
22.6% SubscribeRaw • 1,617 ms • System.Reactive.Producer`2.SubscribeRaw(IObserver, Boolean)
22.6% ScheduleAction • 1,617 ms • System.Reactive.Concurrency.Scheduler.ScheduleAction(IScheduler, TState, Action)
22.6% Schedule • 1,617 ms • System.Reactive.Concurrency.LocalScheduler.Schedule(TState, Func)
22.6% Schedule • 1,617 ms • System.Reactive.Concurrency.CurrentThreadScheduler.Schedule(TState, TimeSpan, Func)
22.5% Run • 1,610 ms • System.Reactive.Concurrency.CurrentThreadScheduler+Trampoline.Run(SchedulerQueue)
22.1% InvokeCore • 1,585 ms • System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
22.1% <LoopRec>b__5_0 • 1,585 ms • System.Reactive.Linq.ObservableImpl.ToObservableRecursive`1+_+<>c.<LoopRec>b__5_0(IScheduler, _)
22.1% LoopRec • 1,579 ms • System.Reactive.Linq.ObservableImpl.ToObservableRecursive`1+_.LoopRec(IScheduler)
20.4% ForwardOnCompleted • 1,459 ms • System.Reactive.Sink`1.ForwardOnCompleted()
20.4% OnCompleted • 1,459 ms • System.Reactive.Linq.ObservableImpl.GroupBy`3+_.OnCompleted()
20.4% OnCompleted • 1,459 ms • System.Reactive.Subjects.Subject`1.OnCompleted()
18.2% OnCompletedCore • 1,302 ms • System.Reactive.AutoDetachObserver`1.OnCompletedCore()
18.2% OnCompleted • 1,302 ms • System.Reactive.Linq.ObservableImpl.Merge`1+Observables+_+InnerObserver.OnCompleted()
18.2% Remove • 1,302 ms • System.Reactive.Disposables.CompositeDisposable.Remove(IDisposable)
17.8% IndexOf • 1,273 ms • System.Array.IndexOf(T[], T, Int32, Int32)
Do you have a code snippet or project that reproduces the problem?
The following class runs under BenchmarkDotNet to exhibit the issue:
using System.Reactive.Linq;
using BenchmarkDotNet.Attributes;
namespace RxPerf;
[MemoryDiagnoser]
public class RxGroupByMicroBenchmark
{
[Params(200_000, 1_000_000)]
public int NumberOfSamples { get; set; }
[Params(10, 100, 1_000, 10_000, 100_000, 150_000, 200_000)]
public int NumberOfGroups { get; set; }
private int[] data = Array.Empty<int>();
[GlobalSetup]
public void GlobalSetup()
{
data = new int[NumberOfSamples];
for (var i = 0; i < data.Length; ++i)
{
data[i] = i;
}
}
private IObservable<int>? observable;
[IterationSetup]
public void IterationSetup()
{
observable = data.ToObservable();
}
[Benchmark]
public void GroupBySelectMany()
{
var numberOfGroups = NumberOfGroups;
observable!.GroupBy(value => value % numberOfGroups)
.SelectMany(groupOfInts => groupOfInts)
.Subscribe(intValue => {});
}
[Benchmark]
public void GroupByMerge()
{
var numberOfGroups = NumberOfGroups;
observable!.GroupBy(value => value % numberOfGroups)
.Merge()
.Subscribe(intValue => {});
}
}Sample output is:
| Method | NumberOfSamples | NumberOfGroups | Mean | Error | StdDev | Median | Gen0 | Gen1 | Gen2 | Allocated |
|------------------ |---------------- |--------------- |------------:|----------:|----------:|------------:|-----------:|----------:|----------:|----------:|
| GroupBySelectMany | 200000 | 10 | 26.77 ms | 0.534 ms | 1.378 ms | 26.29 ms | 2000.0000 | - | - | 12.21 MB |
| GroupByMerge | 200000 | 10 | 26.46 ms | 0.523 ms | 1.020 ms | 26.20 ms | 2000.0000 | - | - | 12.21 MB |
| GroupBySelectMany | 200000 | 100 | 27.71 ms | 0.548 ms | 1.030 ms | 27.48 ms | 2000.0000 | - | - | 12.25 MB |
| GroupByMerge | 200000 | 100 | 26.99 ms | 0.530 ms | 0.589 ms | 26.82 ms | 2000.0000 | - | - | 12.25 MB |
| GroupBySelectMany | 200000 | 1000 | 28.64 ms | 0.556 ms | 0.882 ms | 28.42 ms | 2000.0000 | 1000.0000 | - | 12.58 MB |
| GroupByMerge | 200000 | 1000 | 28.04 ms | 0.547 ms | 0.692 ms | 28.05 ms | 2000.0000 | 1000.0000 | - | 12.58 MB |
| GroupBySelectMany | 200000 | 10000 | 41.60 ms | 0.823 ms | 1.375 ms | 41.43 ms | 2000.0000 | 1000.0000 | - | 16 MB |
| GroupByMerge | 200000 | 10000 | 42.33 ms | 0.835 ms | 0.928 ms | 42.16 ms | 2000.0000 | 1000.0000 | - | 16 MB |
| GroupBySelectMany | 200000 | 100000 | 475.65 ms | 9.385 ms | 13.157 ms | 469.82 ms | 8000.0000 | 5000.0000 | 2000.0000 | 48.45 MB |
| GroupByMerge | 200000 | 100000 | 468.15 ms | 5.728 ms | 5.078 ms | 467.44 ms | 8000.0000 | 5000.0000 | 2000.0000 | 48.45 MB |
| GroupBySelectMany | 200000 | 150000 | 929.67 ms | 10.310 ms | 9.140 ms | 928.65 ms | 10000.0000 | 6000.0000 | 2000.0000 | 64.04 MB |
| GroupByMerge | 200000 | 150000 | 920.18 ms | 5.883 ms | 4.593 ms | 921.05 ms | 10000.0000 | 6000.0000 | 2000.0000 | 64.04 MB |
| GroupBySelectMany | 200000 | 200000 | 1,502.65 ms | 15.967 ms | 14.936 ms | 1,494.68 ms | 13000.0000 | 8000.0000 | 3000.0000 | 85.29 MB |
| GroupByMerge | 200000 | 200000 | 1,490.90 ms | 12.406 ms | 11.604 ms | 1,488.47 ms | 13000.0000 | 8000.0000 | 3000.0000 | 85.29 MB |
| GroupBySelectMany | 1000000 | 10 | 128.24 ms | 0.648 ms | 0.506 ms | 128.35 ms | 10000.0000 | - | - | 61.04 MB |
| GroupByMerge | 1000000 | 10 | 138.52 ms | 2.745 ms | 6.471 ms | 136.22 ms | 10000.0000 | - | - | 61.04 MB |
| GroupBySelectMany | 1000000 | 100 | 136.67 ms | 2.585 ms | 2.654 ms | 135.71 ms | 10000.0000 | - | - | 61.07 MB |
| GroupByMerge | 1000000 | 100 | 140.28 ms | 1.412 ms | 1.179 ms | 140.16 ms | 10000.0000 | - | - | 61.07 MB |
| GroupBySelectMany | 1000000 | 1000 | 141.69 ms | 2.704 ms | 2.893 ms | 141.58 ms | 10000.0000 | 1000.0000 | - | 61.41 MB |
| GroupByMerge | 1000000 | 1000 | 134.36 ms | 2.269 ms | 2.123 ms | 133.69 ms | 10000.0000 | 1000.0000 | - | 61.41 MB |
| GroupBySelectMany | 1000000 | 10000 | 152.31 ms | 2.960 ms | 2.769 ms | 153.28 ms | 10000.0000 | 1000.0000 | - | 64.83 MB |
| GroupByMerge | 1000000 | 10000 | 150.45 ms | 2.716 ms | 2.668 ms | 150.68 ms | 10000.0000 | 1000.0000 | - | 64.83 MB |
| GroupBySelectMany | 1000000 | 100000 | 578.28 ms | 11.323 ms | 14.722 ms | 572.63 ms | 15000.0000 | 5000.0000 | 1000.0000 | 97.28 MB |
| GroupByMerge | 1000000 | 100000 | 576.79 ms | 10.564 ms | 9.365 ms | 573.79 ms | 15000.0000 | 5000.0000 | 1000.0000 | 97.28 MB |
| GroupBySelectMany | 1000000 | 150000 | 1,031.99 ms | 7.762 ms | 6.482 ms | 1,029.68 ms | 18000.0000 | 7000.0000 | 2000.0000 | 112.87 MB |
| GroupByMerge | 1000000 | 150000 | 1,053.82 ms | 15.343 ms | 15.069 ms | 1,047.47 ms | 18000.0000 | 7000.0000 | 2000.0000 | 112.88 MB |
| GroupBySelectMany | 1000000 | 200000 | 1,632.03 ms | 18.536 ms | 17.339 ms | 1,633.38 ms | 20000.0000 | 8000.0000 | 2000.0000 | 134.12 MB |
| GroupByMerge | 1000000 | 200000 | 1,673.68 ms | 27.066 ms | 25.318 ms | 1,667.54 ms | 20000.0000 | 8000.0000 | 2000.0000 | 134.12 MB |
Note that increasing the number of samples has a relatively consistent increase the total time of about 100 ms, whereas increasing the number of groups has a more quadratic relationship with overall time, becoming the dominating factor at around 100,000 groups.