@@ -32,13 +32,21 @@ public class SocketInput : ICriticalNotifyCompletion, IDisposable
32
32
private bool _consuming ;
33
33
private bool _disposed ;
34
34
35
+ private readonly BufferLengthConnectionController _bufferLengthConnectionController ;
36
+
35
37
public SocketInput ( MemoryPool memory , IThreadPool threadPool )
36
38
{
37
39
_memory = memory ;
38
40
_threadPool = threadPool ;
39
41
_awaitableState = _awaitableIsNotCompleted ;
40
42
}
41
43
44
+ public SocketInput ( MemoryPool memory , IThreadPool threadPool , int maxBufferLength , IConnectionControl connectionControl ,
45
+ KestrelThread connectionThread ) : this ( memory , threadPool )
46
+ {
47
+ _bufferLengthConnectionController = new BufferLengthConnectionController ( maxBufferLength , connectionControl , connectionThread ) ;
48
+ }
49
+
42
50
public bool RemoteIntakeFin { get ; set ; }
43
51
44
52
public bool IsCompleted => ReferenceEquals ( _awaitableState , _awaitableIsCompleted ) ;
@@ -63,6 +71,9 @@ public void IncomingData(byte[] buffer, int offset, int count)
63
71
{
64
72
lock ( _sync )
65
73
{
74
+ // Must call Add() before bytes are available to consumer, to ensure that Length is >= 0
75
+ _bufferLengthConnectionController ? . Add ( count ) ;
76
+
66
77
if ( count > 0 )
67
78
{
68
79
if ( _tail == null )
@@ -93,6 +104,9 @@ public void IncomingComplete(int count, Exception error)
93
104
{
94
105
lock ( _sync )
95
106
{
107
+ // Must call Add() before bytes are available to consumer, to ensure that Length is >= 0
108
+ _bufferLengthConnectionController ? . Add ( count ) ;
109
+
96
110
if ( _pinned != null )
97
111
{
98
112
_pinned . End += count ;
@@ -189,10 +203,16 @@ public void ConsumingComplete(
189
203
{
190
204
if ( ! consumed . IsDefault )
191
205
{
206
+ var lengthConsumed = new MemoryPoolIterator ( _head ) . GetLength ( consumed ) ;
207
+
192
208
returnStart = _head ;
193
209
returnEnd = consumed . Block ;
194
210
_head = consumed . Block ;
195
211
_head . Start = consumed . Index ;
212
+
213
+ // Must call Subtract() after bytes have been freed, to avoid producer starting too early and growing
214
+ // buffer beyond max length.
215
+ _bufferLengthConnectionController ? . Subtract ( lengthConsumed ) ;
196
216
}
197
217
198
218
if ( ! examined . IsDefault &&
@@ -321,5 +341,73 @@ private static void ReturnBlocks(MemoryPoolBlock block, MemoryPoolBlock end)
321
341
returnBlock . Pool . Return ( returnBlock ) ;
322
342
}
323
343
}
344
+
345
+ private class BufferLengthConnectionController
346
+ {
347
+ private readonly int _maxLength ;
348
+ private readonly IConnectionControl _connectionControl ;
349
+ private readonly KestrelThread _connectionThread ;
350
+
351
+ private readonly object _lock = new object ( ) ;
352
+
353
+ private int _length ;
354
+ private bool _connectionPaused ;
355
+
356
+ public BufferLengthConnectionController ( int maxLength , IConnectionControl connectionControl , KestrelThread connectionThread )
357
+ {
358
+ _maxLength = maxLength ;
359
+ _connectionControl = connectionControl ;
360
+ _connectionThread = connectionThread ;
361
+ }
362
+
363
+ public int Length
364
+ {
365
+ get
366
+ {
367
+ return _length ;
368
+ }
369
+ set
370
+ {
371
+ // Caller should ensure that bytes are never consumed before the producer has called Add()
372
+ Debug . Assert ( value >= 0 ) ;
373
+
374
+ _length = value ;
375
+ }
376
+ }
377
+
378
+ public void Add ( int count )
379
+ {
380
+ lock ( _lock )
381
+ {
382
+ // Add() should never be called while connection is paused, since ConnectionControl.Pause() runs on a libuv thread
383
+ // and should take effect immediately.
384
+ Debug . Assert ( ! _connectionPaused ) ;
385
+
386
+ Length += count ;
387
+ if ( Length >= _maxLength )
388
+ {
389
+ _connectionPaused = true ;
390
+ _connectionControl . Pause ( ) ;
391
+ }
392
+ }
393
+ }
394
+
395
+ public void Subtract ( int count )
396
+ {
397
+ lock ( _lock )
398
+ {
399
+ Length -= count ;
400
+
401
+ if ( _connectionPaused && Length < _maxLength )
402
+ {
403
+ _connectionPaused = false ;
404
+ _connectionThread . Post (
405
+ ( connectionControl ) => ( ( IConnectionControl ) connectionControl ) . Resume ( ) ,
406
+ _connectionControl ) ;
407
+ }
408
+ }
409
+ }
410
+ }
411
+
324
412
}
325
413
}
0 commit comments