@@ -45,17 +45,7 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
45
45
46
46
KestrelTrace . Log . ConnectionWrite ( 0 , buffer . Count ) ;
47
47
48
- var writeOp = new WriteOperation
49
- {
50
- Buffer = buffer
51
- } ;
52
-
53
- var callbackContext = new CallbackContext
54
- {
55
- Callback = callback ,
56
- State = state ,
57
- BytesToWrite = buffer . Count
58
- } ;
48
+ bool triggerCallbackNow = false ;
59
49
60
50
lock ( _lockObj )
61
51
{
@@ -64,20 +54,22 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
64
54
_nextWriteContext = new WriteContext ( this ) ;
65
55
}
66
56
67
- _nextWriteContext . Operations . Enqueue ( writeOp ) ;
57
+ _nextWriteContext . Buffers . Enqueue ( buffer ) ;
68
58
_numBytesBuffered += buffer . Count ;
69
59
70
60
// Complete the write task immediately if all previous write tasks have been completed,
71
61
// the buffers haven't grown too large, and the last write to the socket succeeded.
72
- if ( _lastWriteError == null &&
73
- _callbacksPending . Count == 0 &&
74
- _numBytesBuffered < _maxBytesBufferedBeforeThrottling )
75
- {
76
- TriggerCallback ( callbackContext ) ;
77
- }
78
- else
62
+ triggerCallbackNow = _lastWriteError == null &&
63
+ _callbacksPending . Count == 0 &&
64
+ _numBytesBuffered < _maxBytesBufferedBeforeThrottling ;
65
+ if ( ! triggerCallbackNow )
79
66
{
80
- _callbacksPending . Enqueue ( callbackContext ) ;
67
+ _callbacksPending . Enqueue ( new CallbackContext
68
+ {
69
+ Callback = callback ,
70
+ State = state ,
71
+ BytesToWrite = buffer . Count
72
+ } ) ;
81
73
}
82
74
83
75
if ( _writesPending < _maxPendingWrites )
@@ -86,6 +78,11 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
86
78
_writesPending ++ ;
87
79
}
88
80
}
81
+
82
+ if ( triggerCallbackNow )
83
+ {
84
+ callback ( null , state ) ;
85
+ }
89
86
}
90
87
91
88
private void ScheduleWrite ( )
@@ -118,19 +115,21 @@ private void WriteAllPending()
118
115
119
116
try
120
117
{
121
- var buffers = new ArraySegment < byte > [ writingContext . Operations . Count ] ;
118
+ var buffers = new ArraySegment < byte > [ writingContext . Buffers . Count ] ;
122
119
123
120
var i = 0 ;
124
- foreach ( var writeOp in writingContext . Operations )
121
+ foreach ( var buffer in writingContext . Buffers )
125
122
{
126
- buffers [ i ] = writeOp . Buffer ;
127
- i ++ ;
123
+ buffers [ i ++ ] = buffer ;
128
124
}
129
125
130
- writingContext . WriteReq . Write ( _socket , new ArraySegment < ArraySegment < byte > > ( buffers ) , ( r , status , error , state ) =>
126
+ var writeReq = new UvWriteReq ( ) ;
127
+ writeReq . Init ( _thread . Loop ) ;
128
+
129
+ writeReq . Write ( _socket , new ArraySegment < ArraySegment < byte > > ( buffers ) , ( r , status , error , state ) =>
131
130
{
132
131
var writtenContext = ( WriteContext ) state ;
133
- writtenContext . Self . OnWriteCompleted ( writtenContext . Operations , r , status , error ) ;
132
+ writtenContext . Self . OnWriteCompleted ( writtenContext . Buffers , r , status , error ) ;
134
133
} , writingContext ) ;
135
134
}
136
135
catch
@@ -147,7 +146,7 @@ private void WriteAllPending()
147
146
}
148
147
149
148
// This is called on the libuv event loop
150
- private void OnWriteCompleted ( Queue < WriteOperation > completedWrites , UvWriteReq req , int status , Exception error )
149
+ private void OnWriteCompleted ( Queue < ArraySegment < byte > > writtenBuffers , UvWriteReq req , int status , Exception error )
151
150
{
152
151
lock ( _lockObj )
153
152
{
@@ -162,16 +161,16 @@ private void OnWriteCompleted(Queue<WriteOperation> completedWrites, UvWriteReq
162
161
_writesPending -- ;
163
162
}
164
163
165
- foreach ( var writeOp in completedWrites )
164
+ foreach ( var writeBuffer in writtenBuffers )
166
165
{
167
- _numBytesBuffered -= writeOp . Buffer . Count ;
166
+ _numBytesBuffered -= writeBuffer . Count ;
168
167
}
169
168
170
169
var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered ;
171
- while ( _callbacksPending . Count > 0 && _callbacksPending . Peek ( ) . BytesToWrite < bytesLeftToBuffer )
170
+ while ( _callbacksPending . Count > 0 &&
171
+ _callbacksPending . Peek ( ) . BytesToWrite < bytesLeftToBuffer )
172
172
{
173
- var context = _callbacksPending . Dequeue ( ) ;
174
- TriggerCallback ( context ) ;
173
+ TriggerCallback ( _callbacksPending . Dequeue ( ) ) ;
175
174
}
176
175
}
177
176
@@ -196,27 +195,16 @@ private class CallbackContext
196
195
public int BytesToWrite ;
197
196
}
198
197
199
- private class WriteOperation
200
- {
201
- public ArraySegment < byte > Buffer ;
202
- }
203
-
204
198
private class WriteContext
205
199
{
206
200
public WriteContext ( SocketOutput self )
207
201
{
208
202
Self = self ;
209
-
210
- WriteReq = new UvWriteReq ( ) ;
211
- WriteReq . Init ( self . _thread . Loop ) ;
212
-
213
- Operations = new Queue < WriteOperation > ( ) ;
203
+ Buffers = new Queue < ArraySegment < byte > > ( ) ;
214
204
}
215
205
216
206
public SocketOutput Self ;
217
-
218
- public UvWriteReq WriteReq ;
219
- public Queue < WriteOperation > Operations ;
207
+ public Queue < ArraySegment < byte > > Buffers ;
220
208
}
221
209
}
222
210
}
0 commit comments