2020using System . Collections . Concurrent ;
2121using System . Text . Json ;
2222using System . Text . Json . Serialization . Metadata ;
23- using System . Threading . Channels ;
23+ using OpenQA . Selenium . BiDi . Session ;
2424using OpenQA . Selenium . Internal . Logging ;
2525
2626namespace OpenQA . Selenium . BiDi ;
@@ -29,107 +29,33 @@ internal sealed class Broker : IAsyncDisposable
2929{
3030 private readonly ILogger _logger = Internal . Logging . Log . GetLogger < Broker > ( ) ;
3131
32- private readonly BiDi _bidi ;
3332 private readonly ITransport _transport ;
33+ private readonly EventDispatcher _eventDispatcher ;
34+ private readonly IBiDi _bidi ;
3435
3536 private readonly ConcurrentDictionary < long , CommandInfo > _pendingCommands = new ( ) ;
36- private readonly Channel < EventInfo > _pendingEvents = Channel . CreateUnbounded < EventInfo > ( new ( )
37- {
38- SingleReader = true ,
39- SingleWriter = true
40- } ) ;
41- private readonly Dictionary < string , JsonTypeInfo > _eventTypesMap = [ ] ;
42-
43- private readonly ConcurrentDictionary < string , List < EventHandler > > _eventHandlers = new ( ) ;
4437
4538 private long _currentCommandId ;
4639
4740 private static readonly TaskFactory _myTaskFactory = new ( CancellationToken . None , TaskCreationOptions . DenyChildAttach , TaskContinuationOptions . None , TaskScheduler . Default ) ;
4841
49- private Task ? _receivingMessageTask ;
50- private Task ? _eventEmitterTask ;
51- private CancellationTokenSource ? _receiveMessagesCancellationTokenSource ;
42+ private readonly Task _receivingMessageTask ;
43+ private readonly CancellationTokenSource _receiveMessagesCancellationTokenSource ;
5244
53- internal Broker ( BiDi bidi , Uri url )
45+ public Broker ( ITransport transport , IBiDi bidi , Func < ISessionModule > sessionProvider )
5446 {
47+ _transport = transport ;
5548 _bidi = bidi ;
56- _transport = new WebSocketTransport ( url ) ;
57- }
58-
59- public async Task ConnectAsync ( CancellationToken cancellationToken )
60- {
61- await _transport . ConnectAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
49+ _eventDispatcher = new EventDispatcher ( sessionProvider ) ;
6250
6351 _receiveMessagesCancellationTokenSource = new CancellationTokenSource ( ) ;
6452 _receivingMessageTask = _myTaskFactory . StartNew ( async ( ) => await ReceiveMessagesAsync ( _receiveMessagesCancellationTokenSource . Token ) , TaskCreationOptions . LongRunning ) . Unwrap ( ) ;
65- _eventEmitterTask = _myTaskFactory . StartNew ( ProcessEventsAwaiterAsync ) . Unwrap ( ) ;
6653 }
6754
68- private async Task ReceiveMessagesAsync ( CancellationToken cancellationToken )
69- {
70- try
71- {
72- while ( ! cancellationToken . IsCancellationRequested )
73- {
74- var data = await _transport . ReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
75-
76- try
77- {
78- ProcessReceivedMessage ( data ) ;
79- }
80- catch ( Exception ex )
81- {
82- if ( _logger . IsEnabled ( LogEventLevel . Error ) )
83- {
84- _logger . Error ( $ "Unhandled error occurred while processing remote message: { ex } ") ;
85- }
86- }
87- }
88- }
89- catch ( Exception ex ) when ( ex is not OperationCanceledException )
90- {
91- if ( _logger . IsEnabled ( LogEventLevel . Error ) )
92- {
93- _logger . Error ( $ "Unhandled error occurred while receiving remote messages: { ex } ") ;
94- }
95-
96- throw ;
97- }
98- }
99-
100- private async Task ProcessEventsAwaiterAsync ( )
55+ public Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , EventHandler eventHandler , SubscriptionOptions ? options , JsonTypeInfo < TEventArgs > jsonTypeInfo , CancellationToken cancellationToken )
56+ where TEventArgs : EventArgs
10157 {
102- var reader = _pendingEvents . Reader ;
103- while ( await reader . WaitToReadAsync ( ) . ConfigureAwait ( false ) )
104- {
105- while ( reader . TryRead ( out var result ) )
106- {
107- try
108- {
109- if ( _eventHandlers . TryGetValue ( result . Method , out var eventHandlers ) )
110- {
111- if ( eventHandlers is not null )
112- {
113- foreach ( var handler in eventHandlers . ToArray ( ) ) // copy handlers avoiding modified collection while iterating
114- {
115- var args = result . Params ;
116-
117- args . BiDi = _bidi ;
118-
119- await handler . InvokeAsync ( args ) . ConfigureAwait ( false ) ;
120- }
121- }
122- }
123- }
124- catch ( Exception ex )
125- {
126- if ( _logger . IsEnabled ( LogEventLevel . Error ) )
127- {
128- _logger . Error ( $ "Unhandled error processing BiDi event handler: { ex } ") ;
129- }
130- }
131- }
132- }
58+ return _eventDispatcher . SubscribeAsync ( eventName , eventHandler , options , jsonTypeInfo , cancellationToken ) ;
13359 }
13460
13561 public async Task < TResult > ExecuteCommandAsync < TCommand , TResult > ( TCommand command , CommandOptions ? options , JsonTypeInfo < TCommand > jsonCommandTypeInfo , JsonTypeInfo < TResult > jsonResultTypeInfo , CancellationToken cancellationToken )
@@ -157,40 +83,23 @@ public async Task<TResult> ExecuteCommandAsync<TCommand, TResult>(TCommand comma
15783 return ( TResult ) await tcs . Task . ConfigureAwait ( false ) ;
15884 }
15985
160- public async Task < Subscription > SubscribeAsync < TEventArgs > ( string eventName , EventHandler eventHandler , SubscriptionOptions ? options , JsonTypeInfo < TEventArgs > jsonTypeInfo , CancellationToken cancellationToken )
161- where TEventArgs : EventArgs
162- {
163- _eventTypesMap [ eventName ] = jsonTypeInfo ;
164-
165- var handlers = _eventHandlers . GetOrAdd ( eventName , ( a ) => [ ] ) ;
166-
167- var subscribeResult = await _bidi . SessionModule . SubscribeAsync ( [ eventName ] , new ( ) { Contexts = options ? . Contexts , UserContexts = options ? . UserContexts } , cancellationToken ) . ConfigureAwait ( false ) ;
168-
169- handlers . Add ( eventHandler ) ;
170-
171- return new Subscription ( subscribeResult . Subscription , this , eventHandler ) ;
172- }
173-
174- public async Task UnsubscribeAsync ( Subscription subscription , CancellationToken cancellationToken )
175- {
176- var eventHandlers = _eventHandlers [ subscription . EventHandler . EventName ] ;
177-
178- eventHandlers . Remove ( subscription . EventHandler ) ;
179-
180- await _bidi . SessionModule . UnsubscribeAsync ( [ subscription . SubscriptionId ] , null , cancellationToken ) . ConfigureAwait ( false ) ;
181- }
182-
18386 public async ValueTask DisposeAsync ( )
18487 {
185- _pendingEvents . Writer . Complete ( ) ;
88+ _receiveMessagesCancellationTokenSource . Cancel ( ) ;
18689
187- _receiveMessagesCancellationTokenSource ? . Cancel ( ) ;
90+ await _eventDispatcher . DisposeAsync ( ) . ConfigureAwait ( false ) ;
18891
189- if ( _eventEmitterTask is not null )
92+ try
93+ {
94+ await _receivingMessageTask . ConfigureAwait ( false ) ;
95+ }
96+ catch ( OperationCanceledException ) when ( _receiveMessagesCancellationTokenSource . IsCancellationRequested )
19097 {
191- await _eventEmitterTask . ConfigureAwait ( false ) ;
98+ // Expected when cancellation is requested, ignore.
19299 }
193100
101+ _receiveMessagesCancellationTokenSource . Dispose ( ) ;
102+
194103 _transport . Dispose ( ) ;
195104
196105 GC . SuppressFinalize ( this ) ;
@@ -204,7 +113,8 @@ private void ProcessReceivedMessage(byte[]? data)
204113 string ? error = default ;
205114 string ? message = default ;
206115 Utf8JsonReader resultReader = default ;
207- Utf8JsonReader paramsReader = default ;
116+ long paramsStartIndex = 0 ;
117+ long paramsEndIndex = 0 ;
208118
209119 Utf8JsonReader reader = new ( new ReadOnlySpan < byte > ( data ) ) ;
210120 reader . Read ( ) ;
@@ -235,7 +145,7 @@ private void ProcessReceivedMessage(byte[]? data)
235145 break ;
236146
237147 case "params" :
238- paramsReader = reader ; // snapshot
148+ paramsStartIndex = reader . TokenStartIndex ;
239149 break ;
240150
241151 case "error" :
@@ -247,21 +157,29 @@ private void ProcessReceivedMessage(byte[]? data)
247157 break ;
248158 }
249159
250- reader . Skip ( ) ;
160+ if ( propertyName == "params" )
161+ {
162+ reader . Skip ( ) ;
163+ paramsEndIndex = reader . BytesConsumed ;
164+ }
165+ else
166+ {
167+ reader . Skip ( ) ;
168+ }
251169 reader . Read ( ) ;
252170 }
253171
254172 switch ( type )
255173 {
256174 case "success" :
257- if ( id is null ) throw new JsonException ( "The remote end responded with 'success' message type, but missed required 'id' property." ) ;
175+ if ( id is null ) throw new BiDiException ( "The remote end responded with 'success' message type, but missed required 'id' property." ) ;
258176
259177 if ( _pendingCommands . TryGetValue ( id . Value , out var command ) )
260178 {
261179 try
262180 {
263181 var commandResult = JsonSerializer . Deserialize ( ref resultReader , command . JsonResultTypeInfo )
264- ?? throw new JsonException ( "Remote end returned null command result in the 'result' property." ) ;
182+ ?? throw new BiDiException ( "Remote end returned null command result in the 'result' property." ) ;
265183
266184 command . TaskCompletionSource . SetResult ( ( EmptyResult ) commandResult ) ;
267185 }
@@ -282,25 +200,13 @@ private void ProcessReceivedMessage(byte[]? data)
282200 break ;
283201
284202 case "event" :
285- if ( method is null ) throw new JsonException ( "The remote end responded with 'event' message type, but missed required 'method' property." ) ;
286-
287- if ( _eventTypesMap . TryGetValue ( method , out var eventInfo ) )
288- {
289- var eventArgs = ( EventArgs ) JsonSerializer . Deserialize ( ref paramsReader , eventInfo ) ! ;
290-
291- eventArgs . BiDi = _bidi ;
292-
293- _pendingEvents . Writer . TryWrite ( new EventInfo ( method , eventArgs ) ) ;
294- }
295- else
296- {
297- throw new BiDiException ( $ "The remote end responded with 'event' message type, but no event type mapping for method '{ method } ' was found.") ;
298- }
299-
203+ if ( method is null ) throw new BiDiException ( "The remote end responded with 'event' message type, but missed required 'method' property." ) ;
204+ var paramsJsonData = new ReadOnlyMemory < byte > ( data , ( int ) paramsStartIndex , ( int ) ( paramsEndIndex - paramsStartIndex ) ) ;
205+ _eventDispatcher . EnqueueEvent ( method , paramsJsonData , _bidi ) ;
300206 break ;
301207
302208 case "error" :
303- if ( id is null ) throw new JsonException ( "The remote end responded with 'error' message type, but missed required 'id' property." ) ;
209+ if ( id is null ) throw new BiDiException ( "The remote end responded with 'error' message type, but missed required 'id' property." ) ;
304210
305211 if ( _pendingCommands . TryGetValue ( id . Value , out var errorCommand ) )
306212 {
@@ -316,7 +222,37 @@ private void ProcessReceivedMessage(byte[]? data)
316222 }
317223 }
318224
319- private readonly record struct CommandInfo ( TaskCompletionSource < EmptyResult > TaskCompletionSource , JsonTypeInfo JsonResultTypeInfo ) ;
225+ private async Task ReceiveMessagesAsync ( CancellationToken cancellationToken )
226+ {
227+ try
228+ {
229+ while ( ! cancellationToken . IsCancellationRequested )
230+ {
231+ var data = await _transport . ReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
320232
321- private readonly record struct EventInfo ( string Method , EventArgs Params ) ;
233+ try
234+ {
235+ ProcessReceivedMessage ( data ) ;
236+ }
237+ catch ( Exception ex )
238+ {
239+ if ( _logger . IsEnabled ( LogEventLevel . Error ) )
240+ {
241+ _logger . Error ( $ "Unhandled error occurred while processing remote message: { ex } ") ;
242+ }
243+ }
244+ }
245+ }
246+ catch ( Exception ex ) when ( ex is not OperationCanceledException )
247+ {
248+ if ( _logger . IsEnabled ( LogEventLevel . Error ) )
249+ {
250+ _logger . Error ( $ "Unhandled error occurred while receiving remote messages: { ex } ") ;
251+ }
252+
253+ throw ;
254+ }
255+ }
256+
257+ private readonly record struct CommandInfo ( TaskCompletionSource < EmptyResult > TaskCompletionSource , JsonTypeInfo JsonResultTypeInfo ) ;
322258}
0 commit comments