11// Licensed to the .NET Foundation under one or more agreements.
22// The .NET Foundation licenses this file to you under the MIT license.
33
4+ using System . Collections . Immutable ;
45using System . Threading . Channels ;
56using Aspire . Dashboard . Model ;
67
@@ -12,7 +13,7 @@ internal sealed class ViewModelProcessor
1213 private readonly Channel < ResourceChange > _incomingChannel ;
1314 private readonly CancellationToken _cancellationToken ;
1415 private readonly Dictionary < string , ResourceViewModel > _snapshot = [ ] ;
15- private readonly List < Channel < ResourceChange > > _subscribedChannels = [ ] ;
16+ private ImmutableHashSet < Channel < ResourceChange > > _outgoingChannels = [ ] ;
1617
1718 public ViewModelProcessor ( Channel < ResourceChange > incomingChannel , CancellationToken cancellationToken )
1819 {
@@ -27,19 +28,17 @@ public ViewModelMonitor GetMonitor()
2728 lock ( _syncLock )
2829 {
2930 var channel = Channel . CreateUnbounded < ResourceChange > ( ) ;
30- _subscribedChannels . Add ( channel ) ;
31+
32+ ImmutableInterlocked . Update ( ref _outgoingChannels , static ( set , channel ) => set . Add ( channel ) , channel ) ;
3133
3234 return new ViewModelMonitor (
3335 Snapshot : _snapshot . Values . ToList ( ) ,
3436 Watch : new ChangeEnumerable ( channel , RemoveChannel ) ) ;
3537 }
36- }
3738
38- private void RemoveChannel ( Channel < ResourceChange > channel )
39- {
40- lock ( _syncLock )
39+ void RemoveChannel ( Channel < ResourceChange > channel )
4140 {
42- _subscribedChannels . Remove ( channel ) ;
41+ ImmutableInterlocked . Update ( ref _outgoingChannels , static ( set , channel ) => set . Remove ( channel ) , channel ) ;
4342 }
4443 }
4544
@@ -96,11 +95,11 @@ private async Task ProcessChanges()
9695 {
9796 await foreach ( var change in _incomingChannel . Reader . ReadAllAsync ( _cancellationToken ) )
9897 {
99- List < Channel < ResourceChange > > outgoingChannels ;
98+ var ( changeType , resource ) = change ;
99+
100100 lock ( _syncLock )
101101 {
102- var resource = change . Resource ;
103- switch ( change . ObjectChangeType )
102+ switch ( changeType )
104103 {
105104 case ObjectChangeType . Added :
106105 _snapshot . Add ( resource . Name , resource ) ;
@@ -114,11 +113,9 @@ private async Task ProcessChanges()
114113 _snapshot . Remove ( resource . Name ) ;
115114 break ;
116115 }
117-
118- outgoingChannels = _subscribedChannels . ToList ( ) ;
119116 }
120117
121- foreach ( var channel in outgoingChannels )
118+ foreach ( var channel in _outgoingChannels )
122119 {
123120 await channel . Writer . WriteAsync ( change , _cancellationToken ) . ConfigureAwait ( false ) ;
124121 }
0 commit comments