@@ -12,23 +12,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
12
12
public class SocketOutput : ISocketOutput
13
13
{
14
14
private const int _maxPendingWrites = 3 ;
15
+ private const int _maxBytesBufferedBeforeThrottling = 65536 / 8 ;
15
16
16
17
private readonly KestrelThread _thread ;
17
18
private readonly UvStreamHandle _socket ;
18
19
19
- private WriteContext _nextWriteContext ;
20
+ // This locks all access to to all the below
21
+ private readonly object _lockObj = new object ( ) ;
20
22
21
23
// The number of write operations that have been scheduled so far
22
24
// but have not completed.
23
- private int _writesSending = 0 ;
25
+ private int _writesPending = 0 ;
24
26
25
- // This locks all access to _nextWriteContext and _writesSending
26
- private readonly object _lockObj = new object ( ) ;
27
+ private int _numBytesBuffered = 0 ;
28
+ private Exception _lastWriteError ;
29
+ private WriteContext _nextWriteContext ;
30
+ private readonly Queue < CallbackContext > _callbacksPending ;
27
31
28
32
public SocketOutput ( KestrelThread thread , UvStreamHandle socket )
29
33
{
30
34
_thread = thread ;
31
35
_socket = socket ;
36
+ _callbacksPending = new Queue < CallbackContext > ( ) ;
32
37
}
33
38
34
39
public void Write ( ArraySegment < byte > buffer , Action < Exception , object > callback , object state )
@@ -40,11 +45,16 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
40
45
41
46
KestrelTrace . Log . ConnectionWrite ( 0 , buffer . Count ) ;
42
47
43
- var context = new WriteOperation
48
+ var writeOp = new WriteOperation
49
+ {
50
+ Buffer = buffer
51
+ } ;
52
+
53
+ var callbackContext = new CallbackContext
44
54
{
45
- Buffer = buffer ,
46
55
Callback = callback ,
47
- State = state
56
+ State = state ,
57
+ BytesToWrite = buffer . Count
48
58
} ;
49
59
50
60
lock ( _lockObj )
@@ -54,12 +64,26 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
54
64
_nextWriteContext = new WriteContext ( this ) ;
55
65
}
56
66
57
- _nextWriteContext . Operations . Add ( context ) ;
67
+ _nextWriteContext . Operations . Enqueue ( writeOp ) ;
68
+ _numBytesBuffered += buffer . Count ;
69
+
70
+ // Complete the write task immediately if all previous write tasks have been completed,
71
+ // the buffers haven't grown too large, and the last write to the socket succeeded.
72
+ if ( _lastWriteError == null &&
73
+ _callbacksPending . Count == 0 &&
74
+ _numBytesBuffered < _maxBytesBufferedBeforeThrottling )
75
+ {
76
+ TriggerCallback ( callbackContext ) ;
77
+ }
78
+ else
79
+ {
80
+ _callbacksPending . Enqueue ( callbackContext ) ;
81
+ }
58
82
59
- if ( _writesSending < _maxPendingWrites )
83
+ if ( _writesPending < _maxPendingWrites )
60
84
{
61
85
ScheduleWrite ( ) ;
62
- _writesSending ++ ;
86
+ _writesPending ++ ;
63
87
}
64
88
}
65
89
}
@@ -87,7 +111,7 @@ private void WriteAllPending()
87
111
}
88
112
else
89
113
{
90
- _writesSending -- ;
114
+ _writesPending -- ;
91
115
return ;
92
116
}
93
117
}
@@ -115,51 +139,66 @@ private void WriteAllPending()
115
139
{
116
140
// Lock instead of using Interlocked.Decrement so _writesSending
117
141
// doesn't change in the middle of executing other synchronized code.
118
- _writesSending -- ;
142
+ _writesPending -- ;
119
143
}
120
144
121
145
throw ;
122
146
}
123
147
}
124
148
125
149
// This is called on the libuv event loop
126
- private void OnWriteCompleted ( List < WriteOperation > completedWrites , UvWriteReq req , int status , Exception error )
150
+ private void OnWriteCompleted ( Queue < WriteOperation > completedWrites , UvWriteReq req , int status , Exception error )
127
151
{
128
152
lock ( _lockObj )
129
153
{
154
+ _lastWriteError = error ;
155
+
130
156
if ( _nextWriteContext != null )
131
157
{
132
158
ScheduleWrite ( ) ;
133
159
}
134
160
else
135
161
{
136
- _writesSending -- ;
162
+ _writesPending -- ;
163
+ }
164
+
165
+ foreach ( var writeOp in completedWrites )
166
+ {
167
+ _numBytesBuffered -= writeOp . Buffer . Count ;
168
+ }
169
+
170
+ var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered ;
171
+ while ( _callbacksPending . Count > 0 && _callbacksPending . Peek ( ) . BytesToWrite < bytesLeftToBuffer )
172
+ {
173
+ var context = _callbacksPending . Dequeue ( ) ;
174
+ TriggerCallback ( context ) ;
137
175
}
138
176
}
139
177
140
178
req . Dispose ( ) ;
179
+ }
141
180
142
- foreach ( var writeOp in completedWrites )
181
+ private void TriggerCallback ( CallbackContext context )
182
+ {
183
+ context . Error = _lastWriteError ;
184
+ ThreadPool . QueueUserWorkItem ( obj =>
143
185
{
144
- KestrelTrace . Log . ConnectionWriteCallback ( 0 , status ) ;
145
- //NOTE: pool this?
146
-
147
- // Get off the event loop before calling user code!
148
- writeOp . Error = error ;
149
- ThreadPool . QueueUserWorkItem ( obj =>
150
- {
151
- var op = ( WriteOperation ) obj ;
152
- op . Callback ( op . Error , op . State ) ;
153
- } , writeOp ) ;
154
- }
186
+ var c = ( CallbackContext ) obj ;
187
+ c . Callback ( c . Error , c . State ) ;
188
+ } , context ) ;
155
189
}
156
190
157
- private class WriteOperation
191
+ private class CallbackContext
158
192
{
159
- public ArraySegment < byte > Buffer ;
160
193
public Exception Error ;
161
194
public Action < Exception , object > Callback ;
162
195
public object State ;
196
+ public int BytesToWrite ;
197
+ }
198
+
199
+ private class WriteOperation
200
+ {
201
+ public ArraySegment < byte > Buffer ;
163
202
}
164
203
165
204
private class WriteContext
@@ -171,13 +210,13 @@ public WriteContext(SocketOutput self)
171
210
WriteReq = new UvWriteReq ( ) ;
172
211
WriteReq . Init ( self . _thread . Loop ) ;
173
212
174
- Operations = new List < WriteOperation > ( ) ;
213
+ Operations = new Queue < WriteOperation > ( ) ;
175
214
}
176
215
177
216
public SocketOutput Self ;
178
217
179
218
public UvWriteReq WriteReq ;
180
- public List < WriteOperation > Operations ;
219
+ public Queue < WriteOperation > Operations ;
181
220
}
182
221
}
183
222
}
0 commit comments