3
3
using System ;
4
4
using System . Collections . Generic ;
5
5
using System . Diagnostics ;
6
- using System . Net ;
7
6
using System . Net . Http ;
8
7
using System . Net . WebSockets ;
9
8
using System . Threading ;
@@ -31,12 +30,9 @@ public TransportRouter(HttpClient httpClient, Func<IClientWebSocket> clientWebSo
31
30
HttpTransport _httpTransport ;
32
31
WebSocketTransport _webSocketTransport ;
33
32
CancellationTokenSource _pollingTokenSource ;
34
- CancellationToken _pollingToken ;
35
33
string _httpUri ;
36
- //int _pingInterval;
37
34
OpenedMessage _openedMessage ;
38
35
CancellationTokenSource _pingTokenSource ;
39
- CancellationToken _pingToken ;
40
36
DateTime _pingTime ;
41
37
42
38
public Uri ServerUri { get ; set ; }
@@ -65,60 +61,6 @@ public async Task ConnectAsync()
65
61
{
66
62
await ConnectByPollingAsync ( ) . ConfigureAwait ( false ) ;
67
63
}
68
-
69
- // if (_webSocketTransport != null)
70
- // {
71
- // _webSocketTransport.Dispose();
72
- // }
73
- //Handshake:
74
- // Uri uri = UriConverter.GetHandshakeUri(ServerUri, EIO, _options.Path, _options.Query);
75
-
76
- // var req = new HttpRequestMessage(HttpMethod.Get, uri);
77
- // SetHeaders(req);
78
-
79
- // var resMsg = await _httpClient.SendAsync(req, new CancellationTokenSource(_options.ConnectionTimeout).Token).ConfigureAwait(false);
80
- // if (!resMsg.IsSuccessStatusCode)
81
- // {
82
- // if (resMsg.StatusCode == HttpStatusCode.NotFound)
83
- // {
84
- // string errMsg = await resMsg.Content.ReadAsStringAsync().ConfigureAwait(false);
85
- // if (errMsg.Contains("Transport unknown"))
86
- // {
87
-
88
- // }
89
- // }
90
- // throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}");
91
- // }
92
- // string text = await resMsg.Content.ReadAsStringAsync().ConfigureAwait(false);
93
- // var openedMessage = MessageFactory.CreateOpenedMessage(text);
94
-
95
- // if (openedMessage.EIO == 3 && EIO == 4)
96
- // {
97
- // EIO = 3;
98
- // goto Handshake;
99
- // }
100
-
101
- // Sid = openedMessage.Sid;
102
- // EIO = openedMessage.EIO;
103
- // uri = UriConverter.GetHandshakeUri(ServerUri, EIO, _options.Path, _options.Query);
104
- // _pingInterval = openedMessage.PingInterval;
105
- // if (openedMessage.Upgrades.Contains("websocket") && _options.AutoUpgrade)
106
- // {
107
- // _clientWebSocket = _clientWebSocketProvider();
108
- // _webSocketTransport = new WebSocketTransport(_clientWebSocket, EIO)
109
- // {
110
- // ConnectionTimeout = _options.ConnectionTimeout
111
- // };
112
- // await WebSocketConnectAsync().ConfigureAwait(false);
113
- // Protocol = TransportProtocol.WebSocket;
114
- // }
115
- // else
116
- // {
117
- // _httpUri = uri + "&sid=" + Sid;
118
- // _httpTransport = new HttpTransport(_httpClient, EIO);
119
- // await HttpConnectAsync().ConfigureAwait(false);
120
- // Protocol = TransportProtocol.Polling;
121
- // }
122
64
}
123
65
124
66
private async Task ConnectByWebsocketAsync ( )
@@ -140,10 +82,12 @@ private async Task ConnectByWebsocketAsync()
140
82
{
141
83
ConnectionTimeout = _options . ConnectionTimeout
142
84
} ;
143
- await _webSocketTransport . ConnectAsync ( uri ) . ConfigureAwait ( false ) ;
144
85
_webSocketTransport . OnTextReceived = OnTextReceived ;
145
86
_webSocketTransport . OnBinaryReceived = OnBinaryReceived ;
146
87
_webSocketTransport . OnAborted = OnAborted ;
88
+ Debug . WriteLine ( $ "[Websocket] Connecting") ;
89
+ await _webSocketTransport . ConnectAsync ( uri ) . ConfigureAwait ( false ) ;
90
+ Debug . WriteLine ( $ "[Websocket] Connected") ;
147
91
}
148
92
149
93
private async Task ConnectByPollingAsync ( )
@@ -163,56 +107,25 @@ private async Task ConnectByPollingAsync()
163
107
OnBinaryReceived = OnBinaryReceived
164
108
} ;
165
109
await _httpTransport . SendAsync ( req , new CancellationTokenSource ( _options . ConnectionTimeout ) . Token ) . ConfigureAwait ( false ) ;
166
- //_openedMessage = MessageFactory.CreateOpenedMessage(text);
167
- //_httpUri = uri + "&sid=" + _openedMessage.Sid;
168
- await HttpConnectAsync ( ) . ConfigureAwait ( false ) ;
169
- }
170
-
171
- //private async Task WebSocketConnectAsync()
172
- //{
173
- // Uri uri = UriConverter.GetWebSocketUri(ServerUri, EIO, _options.Path, _options.Query, Sid);
174
- // await _webSocketTransport.ConnectAsync(uri).ConfigureAwait(false);
175
- // _webSocketTransport.OnTextReceived = OnWebSocketTextReceived;
176
- // _webSocketTransport.OnBinaryReceived = OnBinaryReceived;
177
- // _webSocketTransport.OnAborted = OnAborted;
178
- // await _webSocketTransport.SendAsync("2probe", CancellationToken.None);
179
- //}
180
-
181
- private async Task HttpConnectAsync ( )
182
- {
110
+ if ( _pollingTokenSource != null )
111
+ {
112
+ _pollingTokenSource . Cancel ( ) ;
113
+ }
183
114
_pollingTokenSource = new CancellationTokenSource ( ) ;
184
- _pollingToken = _pollingTokenSource . Token ;
185
115
186
- StartPolling ( ) ;
187
-
188
- //if (!(EIO == 3 && string.IsNullOrEmpty(Namespace)))
189
- //{
190
- // var msg = new ConnectedMessage
191
- // {
192
- // Namespace = Namespace,
193
- // Eio = EIO,
194
- // Protocol = TransportProtocol.Polling,
195
- // Query = _options.Query
196
- // };
197
- // await SendAsync(msg.Write(), CancellationToken.None).ConfigureAwait(false);
198
- //}
116
+ StartPolling ( _pollingTokenSource . Token ) ;
199
117
}
200
118
201
- private void StartPolling ( )
119
+ private void StartPolling ( CancellationToken cancellationToken )
202
120
{
203
121
Task . Factory . StartNew ( async ( ) =>
204
122
{
205
- while ( ! _pollingToken . IsCancellationRequested )
123
+ while ( ! cancellationToken . IsCancellationRequested )
206
124
{
207
125
try
208
126
{
209
127
await _httpTransport . GetAsync ( _httpUri , CancellationToken . None ) . ConfigureAwait ( false ) ;
210
128
}
211
- catch ( TaskCanceledException e )
212
- {
213
- Debug . WriteLine ( e ) ;
214
- break ;
215
- }
216
129
catch
217
130
{
218
131
OnTransportClosed ( ) ;
@@ -222,53 +135,37 @@ private void StartPolling()
222
135
} , TaskCreationOptions . LongRunning ) ;
223
136
}
224
137
225
- private async Task PingAsync ( )
138
+ private void StartPing ( CancellationToken cancellationToken )
226
139
{
227
- Debug . WriteLine ( $ "PingInterval : { _openedMessage . PingInterval } ") ;
228
- while ( ! _pingToken . IsCancellationRequested )
140
+ Debug . WriteLine ( $ "[Ping] Interval : { _openedMessage . PingInterval } ") ;
141
+ Task . Factory . StartNew ( async ( ) =>
229
142
{
230
- await Task . Delay ( _openedMessage . PingInterval ) ;
231
- try
232
- {
233
- var ping = new PingMessage ( ) ;
234
- Debug . WriteLine ( $ "Send Ping") ;
235
- await SendAsync ( ping , CancellationToken . None ) . ConfigureAwait ( false ) ;
236
- _pingTime = DateTime . Now ;
237
- OnMessageReceived ( ping ) ;
238
- }
239
- catch
143
+ while ( ! cancellationToken . IsCancellationRequested )
240
144
{
241
- OnTransportClosed ( ) ;
242
- throw ;
145
+ await Task . Delay ( _openedMessage . PingInterval ) ;
146
+ if ( cancellationToken . IsCancellationRequested )
147
+ {
148
+ break ;
149
+ }
150
+ try
151
+ {
152
+ var ping = new PingMessage ( ) ;
153
+ Debug . WriteLine ( $ "[Ping] Sending") ;
154
+ await SendAsync ( ping , CancellationToken . None ) . ConfigureAwait ( false ) ;
155
+ Debug . WriteLine ( $ "[Ping] Has been sent") ;
156
+ _pingTime = DateTime . Now ;
157
+ OnMessageReceived ( ping ) ;
158
+ }
159
+ catch ( Exception e )
160
+ {
161
+ Debug . WriteLine ( $ "[Ping] Failed to send, { e . Message } ") ;
162
+ OnTransportClosed ( ) ;
163
+ throw ;
164
+ }
243
165
}
244
- }
166
+ } , TaskCreationOptions . LongRunning ) ;
245
167
}
246
168
247
- //private async void OnWebSocketTextReceived(string text)
248
- //{
249
- // if (text == "3probe")
250
- // {
251
- // await _webSocketTransport.SendAsync("5", CancellationToken.None);
252
-
253
- // if (EIO == 3 && string.IsNullOrEmpty(Namespace))
254
- // {
255
- // return;
256
- // }
257
- // var msg = new ConnectedMessage
258
- // {
259
- // Namespace = Namespace,
260
- // Eio = EIO,
261
- // Sid = Sid,
262
- // Protocol = TransportProtocol.WebSocket,
263
- // Query = _options.Query
264
- // };
265
- // await _webSocketTransport.SendAsync(msg.Write(), CancellationToken.None);
266
- // }
267
- // else
268
- // {
269
- // OnTextReceived(text);
270
- // }
271
- //}
272
169
private async Task OnOpened ( OpenedMessage msg )
273
170
{
274
171
_openedMessage = msg ;
@@ -314,10 +211,9 @@ private async void OnTextReceived(string text)
314
211
if ( _pingTokenSource != null )
315
212
{
316
213
_pingTokenSource . Cancel ( ) ;
317
- _pingTokenSource = new CancellationTokenSource ( ) ;
318
- _pingToken = _pingTokenSource . Token ;
319
214
}
320
- _ = Task . Factory . StartNew ( PingAsync , TaskCreationOptions . LongRunning ) ;
215
+ _pingTokenSource = new CancellationTokenSource ( ) ;
216
+ StartPing ( _pingTokenSource . Token ) ;
321
217
}
322
218
else
323
219
{
@@ -376,6 +272,7 @@ private void OnBinaryReceived(byte[] bytes)
376
272
377
273
private void OnAborted ( Exception e )
378
274
{
275
+ Debug . WriteLine ( $ "[Websocket] Aborted, " + e . Message ) ;
379
276
OnTransportClosed ( ) ;
380
277
}
381
278
@@ -409,6 +306,10 @@ public async Task DisconnectAsync()
409
306
}
410
307
_clientWebSocket . Dispose ( ) ;
411
308
}
309
+ if ( _pingTokenSource != null )
310
+ {
311
+ _pingTokenSource . Cancel ( ) ;
312
+ }
412
313
}
413
314
414
315
private async Task SendAsync ( string text , CancellationToken cancellationToken )
@@ -450,10 +351,6 @@ public void Dispose()
450
351
{
451
352
_webSocketTransport . Dispose ( ) ;
452
353
}
453
- if ( _pingTokenSource != null )
454
- {
455
- _pingTokenSource . Cancel ( ) ;
456
- }
457
354
}
458
355
}
459
356
}
0 commit comments