@@ -12,12 +12,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
12
12
public class SocketOutput : ISocketOutput
13
13
{
14
14
private const int _maxPendingWrites = 3 ;
15
- private const int _maxBytesBufferedBeforeThrottling = 65536 / 8 ;
15
+ private const int _maxBytesBufferedBeforeThrottling = 65536 ;
16
16
17
17
private readonly KestrelThread _thread ;
18
18
private readonly UvStreamHandle _socket ;
19
19
20
- // This locks all access to to all the below
20
+ // This locks access to to all of the below fields
21
21
private readonly object _lockObj = new object ( ) ;
22
22
23
23
// The number of write operations that have been scheduled so far
@@ -45,17 +45,7 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
45
45
46
46
KestrelTrace . Log . ConnectionWrite ( 0 , buffer . Count ) ;
47
47
48
- var writeOp = new WriteOperation
49
- {
50
- Buffer = buffer
51
- } ;
52
-
53
- var callbackContext = new CallbackContext
54
- {
55
- Callback = callback ,
56
- State = state ,
57
- BytesToWrite = buffer . Count
58
- } ;
48
+ bool triggerCallbackNow = false ;
59
49
60
50
lock ( _lockObj )
61
51
{
@@ -64,20 +54,22 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
64
54
_nextWriteContext = new WriteContext ( this ) ;
65
55
}
66
56
67
- _nextWriteContext . Operations . Enqueue ( writeOp ) ;
57
+ _nextWriteContext . Buffers . Enqueue ( buffer ) ;
68
58
_numBytesBuffered += buffer . Count ;
69
59
70
60
// Complete the write task immediately if all previous write tasks have been completed,
71
61
// the buffers haven't grown too large, and the last write to the socket succeeded.
72
- if ( _lastWriteError == null &&
73
- _callbacksPending . Count == 0 &&
74
- _numBytesBuffered < _maxBytesBufferedBeforeThrottling )
62
+ triggerCallbackNow = _lastWriteError == null &&
63
+ _callbacksPending . Count == 0 &&
64
+ _numBytesBuffered <= _maxBytesBufferedBeforeThrottling ;
65
+ if ( ! triggerCallbackNow )
75
66
{
76
- TriggerCallback ( callbackContext ) ;
77
- }
78
- else
79
- {
80
- _callbacksPending . Enqueue ( callbackContext ) ;
67
+ _callbacksPending . Enqueue ( new CallbackContext
68
+ {
69
+ Callback = callback ,
70
+ State = state ,
71
+ BytesToWrite = buffer . Count
72
+ } ) ;
81
73
}
82
74
83
75
if ( _writesPending < _maxPendingWrites )
@@ -86,6 +78,11 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
86
78
_writesPending ++ ;
87
79
}
88
80
}
81
+
82
+ if ( triggerCallbackNow )
83
+ {
84
+ callback ( null , state ) ;
85
+ }
89
86
}
90
87
91
88
private void ScheduleWrite ( )
@@ -118,19 +115,21 @@ private void WriteAllPending()
118
115
119
116
try
120
117
{
121
- var buffers = new ArraySegment < byte > [ writingContext . Operations . Count ] ;
118
+ var buffers = new ArraySegment < byte > [ writingContext . Buffers . Count ] ;
122
119
123
120
var i = 0 ;
124
- foreach ( var writeOp in writingContext . Operations )
121
+ foreach ( var buffer in writingContext . Buffers )
125
122
{
126
- buffers [ i ] = writeOp . Buffer ;
127
- i ++ ;
123
+ buffers [ i ++ ] = buffer ;
128
124
}
129
125
130
- writingContext . WriteReq . Write ( _socket , new ArraySegment < ArraySegment < byte > > ( buffers ) , ( r , status , error , state ) =>
126
+ var writeReq = new UvWriteReq ( ) ;
127
+ writeReq . Init ( _thread . Loop ) ;
128
+
129
+ writeReq . Write ( _socket , new ArraySegment < ArraySegment < byte > > ( buffers ) , ( r , status , error , state ) =>
131
130
{
132
131
var writtenContext = ( WriteContext ) state ;
133
- writtenContext . Self . OnWriteCompleted ( writtenContext . Operations , r , status , error ) ;
132
+ writtenContext . Self . OnWriteCompleted ( writtenContext . Buffers , r , status , error ) ;
134
133
} , writingContext ) ;
135
134
}
136
135
catch
@@ -147,8 +146,10 @@ private void WriteAllPending()
147
146
}
148
147
149
148
// This is called on the libuv event loop
150
- private void OnWriteCompleted ( Queue < WriteOperation > completedWrites , UvWriteReq req , int status , Exception error )
149
+ private void OnWriteCompleted ( Queue < ArraySegment < byte > > writtenBuffers , UvWriteReq req , int status , Exception error )
151
150
{
151
+ KestrelTrace . Log . ConnectionWriteCallback ( 0 , status ) ;
152
+
152
153
lock ( _lockObj )
153
154
{
154
155
_lastWriteError = error ;
@@ -162,16 +163,16 @@ private void OnWriteCompleted(Queue<WriteOperation> completedWrites, UvWriteReq
162
163
_writesPending -- ;
163
164
}
164
165
165
- foreach ( var writeOp in completedWrites )
166
+ foreach ( var writeBuffer in writtenBuffers )
166
167
{
167
- _numBytesBuffered -= writeOp . Buffer . Count ;
168
+ _numBytesBuffered -= writeBuffer . Count ;
168
169
}
169
170
170
171
var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered ;
171
- while ( _callbacksPending . Count > 0 && _callbacksPending . Peek ( ) . BytesToWrite < bytesLeftToBuffer )
172
+ while ( _callbacksPending . Count > 0 &&
173
+ _callbacksPending . Peek ( ) . BytesToWrite <= bytesLeftToBuffer )
172
174
{
173
- var context = _callbacksPending . Dequeue ( ) ;
174
- TriggerCallback ( context ) ;
175
+ TriggerCallback ( _callbacksPending . Dequeue ( ) ) ;
175
176
}
176
177
}
177
178
@@ -196,27 +197,16 @@ private class CallbackContext
196
197
public int BytesToWrite ;
197
198
}
198
199
199
- private class WriteOperation
200
- {
201
- public ArraySegment < byte > Buffer ;
202
- }
203
-
204
200
private class WriteContext
205
201
{
206
202
public WriteContext ( SocketOutput self )
207
203
{
208
204
Self = self ;
209
-
210
- WriteReq = new UvWriteReq ( ) ;
211
- WriteReq . Init ( self . _thread . Loop ) ;
212
-
213
- Operations = new Queue < WriteOperation > ( ) ;
205
+ Buffers = new Queue < ArraySegment < byte > > ( ) ;
214
206
}
215
207
216
208
public SocketOutput Self ;
217
-
218
- public UvWriteReq WriteReq ;
219
- public Queue < WriteOperation > Operations ;
209
+ public Queue < ArraySegment < byte > > Buffers ;
220
210
}
221
211
}
222
212
}
0 commit comments