@@ -16,6 +16,8 @@ public class SocketOutput : ISocketOutput
16
16
{
17
17
private const int _maxPendingWrites = 3 ;
18
18
private const int _maxBytesPreCompleted = 65536 ;
19
+ private const int _maxPooledWriteContexts = 16 ;
20
+ private const int _maxPooledBufferQueues = 16 ;
19
21
20
22
private readonly KestrelThread _thread ;
21
23
private readonly UvStreamHandle _socket ;
@@ -25,6 +27,7 @@ public class SocketOutput : ISocketOutput
25
27
26
28
// This locks access to to all of the below fields
27
29
private readonly object _lockObj = new object ( ) ;
30
+ private bool _isDisposed ;
28
31
29
32
// The number of write operations that have been scheduled so far
30
33
// but have not completed.
@@ -34,6 +37,7 @@ public class SocketOutput : ISocketOutput
34
37
private Exception _lastWriteError ;
35
38
private WriteContext _nextWriteContext ;
36
39
private readonly Queue < TaskCompletionSource < object > > _tasksPending ;
40
+ private readonly Queue < WriteContext > _writeContexts ;
37
41
38
42
public SocketOutput (
39
43
KestrelThread thread ,
@@ -47,7 +51,8 @@ public SocketOutput(
47
51
_connection = connection ;
48
52
_connectionId = connectionId ;
49
53
_log = log ;
50
- _tasksPending = new Queue < TaskCompletionSource < object > > ( ) ;
54
+ _tasksPending = new Queue < TaskCompletionSource < object > > ( 16 ) ;
55
+ _writeContexts = new Queue < WriteContext > ( _maxPooledWriteContexts ) ;
51
56
}
52
57
53
58
public Task WriteAsync (
@@ -71,7 +76,14 @@ public Task WriteAsync(
71
76
{
72
77
if ( _nextWriteContext == null )
73
78
{
74
- _nextWriteContext = new WriteContext ( this ) ;
79
+ if ( _writeContexts . Count > 0 )
80
+ {
81
+ _nextWriteContext = _writeContexts . Dequeue ( ) ;
82
+ }
83
+ else
84
+ {
85
+ _nextWriteContext = new WriteContext ( this ) ;
86
+ }
75
87
}
76
88
77
89
if ( buffer . Array != null )
@@ -180,13 +192,13 @@ private void WriteAllPending()
180
192
}
181
193
182
194
// This is called on the libuv event loop
183
- private void OnWriteCompleted ( Queue < ArraySegment < byte > > writtenBuffers , int status , Exception error )
195
+ private void OnWriteCompleted ( WriteContext write )
184
196
{
185
- _log . ConnectionWriteCallback ( _connectionId , status ) ;
197
+ var status = write . WriteStatus ;
186
198
187
- if ( error != null )
199
+ if ( write . WriteError != null )
188
200
{
189
- _lastWriteError = new IOException ( error . Message , error ) ;
201
+ _lastWriteError = new IOException ( write . WriteError . Message , write . WriteError ) ;
190
202
191
203
// Abort the connection for any failed write.
192
204
_connection . Abort ( ) ;
@@ -203,7 +215,7 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, int stat
203
215
_writesPending -- ;
204
216
}
205
217
206
- foreach ( var writeBuffer in writtenBuffers )
218
+ foreach ( var writeBuffer in write . Buffers )
207
219
{
208
220
// _numBytesPreCompleted can temporarily go negative in the event there are
209
221
// completed writes that we haven't triggered callbacks for yet.
@@ -230,16 +242,31 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, int stat
230
242
}
231
243
else
232
244
{
245
+ var error = _lastWriteError ;
233
246
// error is closure captured
234
247
ThreadPool . QueueUserWorkItem (
235
- ( o ) => ( ( TaskCompletionSource < object > ) o ) . SetException ( _lastWriteError ) ,
248
+ ( o ) => ( ( TaskCompletionSource < object > ) o ) . SetException ( error ) ,
236
249
tcs ) ;
237
250
}
238
251
}
239
252
253
+ if ( _writeContexts . Count < _maxPooledWriteContexts
254
+ && write . Buffers . Count <= _maxPooledBufferQueues
255
+ && ! _isDisposed )
256
+ {
257
+ write . Reset ( ) ;
258
+ _writeContexts . Enqueue ( write ) ;
259
+ }
260
+ else
261
+ {
262
+ write . Dispose ( ) ;
263
+ }
264
+
240
265
// Now that the while loop has completed the following invariants should hold true:
241
266
Debug . Assert ( _numBytesPreCompleted >= 0 ) ;
242
267
}
268
+
269
+ _log . ConnectionWriteCallback ( _connectionId , status ) ;
243
270
}
244
271
245
272
void ISocketOutput . Write ( ArraySegment < byte > buffer , bool immediate )
@@ -261,8 +288,24 @@ Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, Cancell
261
288
return WriteAsync ( buffer , immediate ) ;
262
289
}
263
290
264
- private class WriteContext
291
+ private void Dispose ( )
265
292
{
293
+ lock ( _lockObj )
294
+ {
295
+ _isDisposed = true ;
296
+
297
+ while ( _writeContexts . Count > 0 )
298
+ {
299
+ _writeContexts . Dequeue ( ) . Dispose ( ) ;
300
+ }
301
+ }
302
+
303
+ }
304
+
305
+ private class WriteContext : IDisposable
306
+ {
307
+ private const int BUFFER_COUNT = 4 ;
308
+
266
309
public SocketOutput Self ;
267
310
268
311
public Queue < ArraySegment < byte > > Buffers ;
@@ -272,12 +315,18 @@ private class WriteContext
272
315
public int WriteStatus ;
273
316
public Exception WriteError ;
274
317
318
+ private UvWriteReq _writeReq ;
319
+ public ArraySegment < byte > [ ] _segments ;
320
+
275
321
public int ShutdownSendStatus ;
276
322
277
323
public WriteContext ( SocketOutput self )
278
324
{
279
325
Self = self ;
280
- Buffers = new Queue < ArraySegment < byte > > ( ) ;
326
+ Buffers = new Queue < ArraySegment < byte > > ( _maxPooledBufferQueues ) ;
327
+ _segments = new ArraySegment < byte > [ BUFFER_COUNT ] ;
328
+ _writeReq = new UvWriteReq ( Self . _log ) ;
329
+ _writeReq . Init ( Self . _thread . Loop ) ;
281
330
}
282
331
283
332
/// <summary>
@@ -291,19 +340,24 @@ public void DoWriteIfNeeded()
291
340
return ;
292
341
}
293
342
294
- var buffers = new ArraySegment < byte > [ Buffers . Count ] ;
343
+ ArraySegment < byte > [ ] segments ;
344
+ if ( Buffers . Count > BUFFER_COUNT )
345
+ {
346
+ segments = new ArraySegment < byte > [ Buffers . Count ] ;
347
+ }
348
+ else
349
+ {
350
+ segments = _segments ;
351
+ }
295
352
296
353
var i = 0 ;
297
354
foreach ( var buffer in Buffers )
298
355
{
299
- buffers [ i ++ ] = buffer ;
356
+ segments [ i ++ ] = buffer ;
300
357
}
301
-
302
- var writeReq = new UvWriteReq ( Self . _log ) ;
303
- writeReq . Init ( Self . _thread . Loop ) ;
304
- writeReq . Write ( Self . _socket , new ArraySegment < ArraySegment < byte > > ( buffers ) , ( _writeReq , status , error , state ) =>
358
+
359
+ _writeReq . Write ( Self . _socket , new ArraySegment < ArraySegment < byte > > ( segments , 0 , Buffers . Count ) , ( _writeReq , status , error , state ) =>
305
360
{
306
- _writeReq . Dispose ( ) ;
307
361
var _this = ( WriteContext ) state ;
308
362
_this . WriteStatus = status ;
309
363
_this . WriteError = error ;
@@ -330,7 +384,7 @@ public void DoShutdownIfNeeded()
330
384
var _this = ( WriteContext ) state ;
331
385
_this . ShutdownSendStatus = status ;
332
386
333
- _this . Self . _log . ConnectionWroteFin ( Self . _connectionId , status ) ;
387
+ _this . Self . _log . ConnectionWroteFin ( _this . Self . _connectionId , status ) ;
334
388
335
389
_this . DoDisconnectIfNeeded ( ) ;
336
390
} , this ) ;
@@ -341,8 +395,14 @@ public void DoShutdownIfNeeded()
341
395
/// </summary>
342
396
public void DoDisconnectIfNeeded ( )
343
397
{
344
- if ( SocketDisconnect == false || Self . _socket . IsClosed )
398
+ if ( SocketDisconnect == false )
399
+ {
400
+ Complete ( ) ;
401
+ return ;
402
+ }
403
+ else if ( Self . _socket . IsClosed )
345
404
{
405
+ Self . Dispose ( ) ;
346
406
Complete ( ) ;
347
407
return ;
348
408
}
@@ -354,7 +414,28 @@ public void DoDisconnectIfNeeded()
354
414
355
415
public void Complete ( )
356
416
{
357
- Self . OnWriteCompleted ( Buffers , WriteStatus , WriteError ) ;
417
+ Self . OnWriteCompleted ( this ) ;
418
+ }
419
+
420
+ public void Reset ( )
421
+ {
422
+ Buffers . Clear ( ) ;
423
+ SocketDisconnect = false ;
424
+ SocketShutdownSend = false ;
425
+ WriteStatus = 0 ;
426
+ WriteError = null ;
427
+ ShutdownSendStatus = 0 ;
428
+
429
+ var segments = _segments ;
430
+ for ( var i = 0 ; i < segments . Length ; i ++ )
431
+ {
432
+ segments [ i ] = default ( ArraySegment < byte > ) ;
433
+ }
434
+ }
435
+
436
+ public void Dispose ( )
437
+ {
438
+ _writeReq . Dispose ( ) ;
358
439
}
359
440
}
360
441
}
0 commit comments