5
5
using System ;
6
6
using System . Collections . Generic ;
7
7
using System . Threading ;
8
+ using System . Diagnostics ;
8
9
9
10
namespace Microsoft . AspNet . Server . Kestrel . Http
10
11
{
11
12
public class SocketOutput : ISocketOutput
12
13
{
13
14
private const int _maxPendingWrites = 3 ;
14
- private const int _maxBytesBufferedBeforeThrottling = 65536 ;
15
+ private const int _maxBytesPreCompleted = 65536 ;
15
16
16
17
private readonly KestrelThread _thread ;
17
18
private readonly UvStreamHandle _socket ;
@@ -23,7 +24,7 @@ public class SocketOutput : ISocketOutput
23
24
// but have not completed.
24
25
private int _writesPending = 0 ;
25
26
26
- private int _numBytesBuffered = 0 ;
27
+ private int _numBytesPreCompleted = 0 ;
27
28
private Exception _lastWriteError ;
28
29
private WriteContext _nextWriteContext ;
29
30
private readonly Queue < CallbackContext > _callbacksPending ;
@@ -54,14 +55,17 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
54
55
}
55
56
56
57
_nextWriteContext . Buffers . Enqueue ( buffer ) ;
57
- _numBytesBuffered += buffer . Count ;
58
58
59
59
// Complete the write task immediately if all previous write tasks have been completed,
60
60
// the buffers haven't grown too large, and the last write to the socket succeeded.
61
61
triggerCallbackNow = _lastWriteError == null &&
62
62
_callbacksPending . Count == 0 &&
63
- _numBytesBuffered <= _maxBytesBufferedBeforeThrottling ;
64
- if ( ! triggerCallbackNow )
63
+ _numBytesPreCompleted + buffer . Count <= _maxBytesPreCompleted ;
64
+ if ( triggerCallbackNow )
65
+ {
66
+ _numBytesPreCompleted += buffer . Count ;
67
+ }
68
+ else
65
69
{
66
70
_callbacksPending . Enqueue ( new CallbackContext
67
71
{
@@ -78,6 +82,7 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
78
82
}
79
83
}
80
84
85
+ // Make sure we call user code outside of the lock.
81
86
if ( triggerCallbackNow )
82
87
{
83
88
callback ( null , state ) ;
@@ -164,15 +169,28 @@ private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, UvWriteR
164
169
165
170
foreach ( var writeBuffer in writtenBuffers )
166
171
{
167
- _numBytesBuffered -= writeBuffer . Count ;
172
+ // _numBytesPreCompleted can temporarily go negative in the event there are
173
+ // completed writes that we haven't triggered callbacks for yet.
174
+ _numBytesPreCompleted -= writeBuffer . Count ;
168
175
}
169
176
170
- var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered ;
177
+
178
+ // bytesLeftToBuffer can be greater than _maxBytesPreCompleted
179
+ // This allows large writes to complete once they've actually finished.
180
+ var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted ;
171
181
while ( _callbacksPending . Count > 0 &&
172
182
_callbacksPending . Peek ( ) . BytesToWrite <= bytesLeftToBuffer )
173
183
{
174
- TriggerCallback ( _callbacksPending . Dequeue ( ) ) ;
184
+ var callbackContext = _callbacksPending . Dequeue ( ) ;
185
+
186
+ _numBytesPreCompleted += callbackContext . BytesToWrite ;
187
+
188
+ TriggerCallback ( callbackContext ) ;
175
189
}
190
+
191
+ // Now that the while loop has completed the following invariants should hold true:
192
+ Trace . Assert ( _numBytesPreCompleted >= 0 ) ;
193
+ Trace . Assert ( _numBytesPreCompleted <= _maxBytesPreCompleted ) ;
176
194
}
177
195
178
196
req . Dispose ( ) ;
0 commit comments