@@ -29,13 +29,21 @@ public class SocketInput : ICriticalNotifyCompletion, IDisposable
29
29
30
30
private int _consumingState ;
31
31
32
+ private readonly BufferLengthConnectionController _bufferLengthConnectionController ;
33
+
32
34
public SocketInput ( MemoryPool memory , IThreadPool threadPool )
33
35
{
34
36
_memory = memory ;
35
37
_threadPool = threadPool ;
36
38
_awaitableState = _awaitableIsNotCompleted ;
37
39
}
38
40
41
+ public SocketInput ( MemoryPool memory , IThreadPool threadPool , long maxBufferLength , IConnectionControl connectionControl ,
42
+ KestrelThread connectionThread ) : this ( memory , threadPool )
43
+ {
44
+ _bufferLengthConnectionController = new BufferLengthConnectionController ( maxBufferLength , connectionControl , connectionThread ) ;
45
+ }
46
+
39
47
public bool RemoteIntakeFin { get ; set ; }
40
48
41
49
public bool IsCompleted => ReferenceEquals ( _awaitableState , _awaitableIsCompleted ) ;
@@ -58,6 +66,9 @@ public MemoryPoolBlock IncomingStart()
58
66
59
67
public void IncomingData ( byte [ ] buffer , int offset , int count )
60
68
{
69
+ // Must call Add() before bytes are available to consumer, to ensure that Length is >= 0
70
+ _bufferLengthConnectionController ? . Add ( count ) ;
71
+
61
72
if ( count > 0 )
62
73
{
63
74
if ( _tail == null )
@@ -85,6 +96,9 @@ public void IncomingData(byte[] buffer, int offset, int count)
85
96
86
97
public void IncomingComplete ( int count , Exception error )
87
98
{
99
+ // Must call Add() before bytes are available to consumer, to ensure that Length is >= 0
100
+ _bufferLengthConnectionController ? . Add ( count ) ;
101
+
88
102
if ( _pinned != null )
89
103
{
90
104
_pinned . End += count ;
@@ -167,10 +181,16 @@ public void ConsumingComplete(
167
181
168
182
if ( ! consumed . IsDefault )
169
183
{
184
+ var lengthConsumed = new MemoryPoolIterator ( _head ) . GetLength ( consumed ) ;
185
+
170
186
returnStart = _head ;
171
187
returnEnd = consumed . Block ;
172
188
_head = consumed . Block ;
173
189
_head . Start = consumed . Index ;
190
+
191
+ // Must call Subtract() after bytes have been freed, to avoid producer starting too early and growing
192
+ // buffer beyond max length.
193
+ _bufferLengthConnectionController ? . Subtract ( lengthConsumed ) ;
174
194
}
175
195
176
196
if ( ! examined . IsDefault &&
@@ -285,5 +305,72 @@ public void Dispose()
285
305
_head = null ;
286
306
_tail = null ;
287
307
}
308
+
309
+ private class BufferLengthConnectionController
310
+ {
311
+ private readonly long _maxLength ;
312
+ private readonly IConnectionControl _connectionControl ;
313
+ private readonly KestrelThread _connectionThread ;
314
+
315
+ private readonly object _lock = new object ( ) ;
316
+
317
+ private long _length ;
318
+ private bool _connectionPaused ;
319
+
320
+ public BufferLengthConnectionController ( long maxLength , IConnectionControl connectionControl , KestrelThread connectionThread )
321
+ {
322
+ _maxLength = maxLength ;
323
+ _connectionControl = connectionControl ;
324
+ _connectionThread = connectionThread ;
325
+ }
326
+
327
+ public long Length
328
+ {
329
+ get
330
+ {
331
+ return _length ;
332
+ }
333
+ set
334
+ {
335
+ // Caller should ensure that bytes are never consumed before the producer has called Add()
336
+ Debug . Assert ( value >= 0 ) ;
337
+
338
+ _length = value ;
339
+ }
340
+ }
341
+
342
+ public void Add ( int count )
343
+ {
344
+ // Add() should never be called while connection is paused, since ConnectionControl.Pause() runs on a libuv thread
345
+ // and should take effect immediately.
346
+ Debug . Assert ( ! _connectionPaused ) ;
347
+
348
+ lock ( _lock )
349
+ {
350
+ Length += count ;
351
+ if ( Length >= _maxLength )
352
+ {
353
+ _connectionPaused = true ;
354
+ _connectionControl . Pause ( ) ;
355
+ }
356
+ }
357
+ }
358
+
359
+ public void Subtract ( int count )
360
+ {
361
+ lock ( _lock )
362
+ {
363
+ Length -= count ;
364
+
365
+ if ( _connectionPaused && Length < _maxLength )
366
+ {
367
+ _connectionPaused = false ;
368
+ _connectionThread . Post (
369
+ ( connectionControl ) => ( ( IConnectionControl ) connectionControl ) . Resume ( ) ,
370
+ _connectionControl ) ;
371
+ }
372
+ }
373
+ }
374
+ }
288
375
}
289
376
}
0 commit comments