5
5
using System . IO ;
6
6
using System . Runtime . CompilerServices ;
7
7
using System . Threading ;
8
- using System . Threading . Tasks ;
9
8
using Microsoft . AspNet . Server . Kestrel . Infrastructure ;
10
9
11
10
namespace Microsoft . AspNet . Server . Kestrel . Http
@@ -25,7 +24,6 @@ public class SocketInput : ICriticalNotifyCompletion
25
24
private MemoryPoolBlock2 _head ;
26
25
private MemoryPoolBlock2 _tail ;
27
26
private MemoryPoolBlock2 _pinned ;
28
- private readonly object _sync = new Object ( ) ;
29
27
30
28
public SocketInput ( MemoryPool2 memory , IThreadPool threadPool )
31
29
{
@@ -34,98 +32,104 @@ public SocketInput(MemoryPool2 memory, IThreadPool threadPool)
34
32
_awaitableState = _awaitableIsNotCompleted ;
35
33
}
36
34
37
- public ArraySegment < byte > Buffer { get ; set ; }
38
-
39
35
public bool RemoteIntakeFin { get ; set ; }
40
36
41
- public bool IsCompleted
37
+ public bool IsCompleted => ( _awaitableState == _awaitableIsCompleted ) ;
38
+
39
+ public MemoryPoolBlock2 IncomingStart ( )
42
40
{
43
- get
41
+ const int minimumSize = 2048 ;
42
+
43
+ if ( _tail != null && minimumSize <= _tail . Data . Offset + _tail . Data . Count - _tail . End )
44
44
{
45
- return Equals ( _awaitableState , _awaitableIsCompleted ) ;
45
+ _pinned = _tail ;
46
+ }
47
+ else
48
+ {
49
+ _pinned = _memory . Lease ( ) ;
46
50
}
47
- }
48
-
49
- public void Skip ( int count )
50
- {
51
- Buffer = new ArraySegment < byte > ( Buffer . Array , Buffer . Offset + count , Buffer . Count - count ) ;
52
- }
53
51
54
- public ArraySegment < byte > Take ( int count )
55
- {
56
- var taken = new ArraySegment < byte > ( Buffer . Array , Buffer . Offset , count ) ;
57
- Skip ( count ) ;
58
- return taken ;
52
+ return _pinned ;
59
53
}
60
54
61
- public IncomingBuffer IncomingStart ( int minimumSize )
55
+ public void IncomingData ( byte [ ] buffer , int offset , int count )
62
56
{
63
- lock ( _sync )
57
+ if ( count > 0 )
64
58
{
65
- if ( _tail != null && minimumSize <= _tail . Data . Offset + _tail . Data . Count - _tail . End )
59
+ if ( _tail == null )
66
60
{
67
- _pinned = _tail ;
68
- var data = new ArraySegment < byte > ( _pinned . Data . Array , _pinned . End , _pinned . Data . Offset + _pinned . Data . Count - _pinned . End ) ;
69
- var dataPtr = _pinned . Pin ( ) + _pinned . End ;
70
- return new IncomingBuffer
71
- {
72
- Data = data ,
73
- DataPtr = dataPtr ,
74
- } ;
61
+ _tail = _memory . Lease ( ) ;
62
+ }
63
+
64
+ var iterator = new MemoryPoolIterator2 ( _tail , _tail . End ) ;
65
+ iterator . CopyFrom ( buffer , offset , count ) ;
66
+
67
+ if ( _head == null )
68
+ {
69
+ _head = _tail ;
75
70
}
76
- }
77
71
78
- _pinned = _memory . Lease ( minimumSize ) ;
79
- return new IncomingBuffer
72
+ _tail = iterator . Block ;
73
+ }
74
+ else
80
75
{
81
- Data = _pinned . Data ,
82
- DataPtr = _pinned . Pin ( ) + _pinned . End
83
- } ;
76
+ RemoteIntakeFin = true ;
77
+ }
78
+
79
+ Complete ( ) ;
84
80
}
85
81
86
82
public void IncomingComplete ( int count , Exception error )
87
83
{
88
- Action awaitableState ;
89
-
90
- lock ( _sync )
84
+ // Unpin may called without an earlier Pin
85
+ if ( _pinned != null )
91
86
{
92
- // Unpin may called without an earlier Pin
93
- if ( _pinned != null )
87
+
88
+ _pinned . End += count ;
89
+
90
+ if ( _head == null )
94
91
{
95
- _pinned . Unpin ( ) ;
96
-
97
- _pinned . End += count ;
98
- if ( _head == null )
99
- {
100
- _head = _tail = _pinned ;
101
- }
102
- else if ( _tail == _pinned )
103
- {
104
- // NO-OP: this was a read into unoccupied tail-space
105
- }
106
- else
107
- {
108
- _tail . Next = _pinned ;
109
- _tail = _pinned ;
110
- }
92
+ _head = _tail = _pinned ;
111
93
}
112
- _pinned = null ;
113
-
114
- if ( count == 0 )
94
+ else if ( _tail == _pinned )
115
95
{
116
- RemoteIntakeFin = true ;
96
+ // NO-OP: this was a read into unoccupied tail-space
117
97
}
118
- if ( error != null )
98
+ else
119
99
{
120
- _awaitableError = error ;
100
+ _tail . Next = _pinned ;
101
+ _tail = _pinned ;
121
102
}
122
103
123
- awaitableState = Interlocked . Exchange (
124
- ref _awaitableState ,
125
- _awaitableIsCompleted ) ;
104
+ _pinned = null ;
105
+ }
126
106
127
- _manualResetEvent . Set ( ) ;
107
+ if ( count == 0 )
108
+ {
109
+ RemoteIntakeFin = true ;
128
110
}
111
+ if ( error != null )
112
+ {
113
+ _awaitableError = error ;
114
+ }
115
+
116
+ Complete ( ) ;
117
+ }
118
+
119
+ public void AbortAwaiting ( )
120
+ {
121
+ _awaitableError = new ObjectDisposedException ( nameof ( SocketInput ) , "The request was aborted" ) ;
122
+
123
+ Complete ( ) ;
124
+ }
125
+
126
+ private void Complete ( )
127
+ {
128
+ var awaitableState = Interlocked . Exchange (
129
+ ref _awaitableState ,
130
+ _awaitableIsCompleted ) ;
131
+
132
+ _manualResetEvent . Set ( ) ;
129
133
130
134
if ( awaitableState != _awaitableIsCompleted &&
131
135
awaitableState != _awaitableIsNotCompleted )
@@ -136,10 +140,7 @@ public void IncomingComplete(int count, Exception error)
136
140
137
141
public MemoryPoolIterator2 ConsumingStart ( )
138
142
{
139
- lock ( _sync )
140
- {
141
- return new MemoryPoolIterator2 ( _head ) ;
142
- }
143
+ return new MemoryPoolIterator2 ( _head ) ;
143
144
}
144
145
145
146
public void ConsumingComplete (
@@ -148,50 +149,31 @@ public void ConsumingComplete(
148
149
{
149
150
MemoryPoolBlock2 returnStart = null ;
150
151
MemoryPoolBlock2 returnEnd = null ;
151
- lock ( _sync )
152
+ if ( ! consumed . IsDefault )
152
153
{
153
- if ( ! consumed . IsDefault )
154
- {
155
- returnStart = _head ;
156
- returnEnd = consumed . Block ;
157
- _head = consumed . Block ;
158
- _head . Start = consumed . Index ;
159
- }
160
- if ( ! examined . IsDefault &&
161
- examined . IsEnd &&
162
- RemoteIntakeFin == false &&
163
- _awaitableError == null )
164
- {
165
- _manualResetEvent . Reset ( ) ;
154
+ returnStart = _head ;
155
+ returnEnd = consumed . Block ;
156
+ _head = consumed . Block ;
157
+ _head . Start = consumed . Index ;
158
+ }
159
+ if ( ! examined . IsDefault &&
160
+ examined . IsEnd &&
161
+ RemoteIntakeFin == false &&
162
+ _awaitableError == null )
163
+ {
164
+ _manualResetEvent . Reset ( ) ;
166
165
167
- var awaitableState = Interlocked . CompareExchange (
168
- ref _awaitableState ,
169
- _awaitableIsNotCompleted ,
170
- _awaitableIsCompleted ) ;
171
- }
166
+ var awaitableState = Interlocked . CompareExchange (
167
+ ref _awaitableState ,
168
+ _awaitableIsNotCompleted ,
169
+ _awaitableIsCompleted ) ;
172
170
}
171
+
173
172
while ( returnStart != returnEnd )
174
173
{
175
174
var returnBlock = returnStart ;
176
175
returnStart = returnStart . Next ;
177
- returnBlock . Pool ? . Return ( returnBlock ) ;
178
- }
179
- }
180
-
181
- public void AbortAwaiting ( )
182
- {
183
- _awaitableError = new ObjectDisposedException ( nameof ( SocketInput ) , "The request was aborted" ) ;
184
-
185
- var awaitableState = Interlocked . Exchange (
186
- ref _awaitableState ,
187
- _awaitableIsCompleted ) ;
188
-
189
- _manualResetEvent . Set ( ) ;
190
-
191
- if ( awaitableState != _awaitableIsCompleted &&
192
- awaitableState != _awaitableIsNotCompleted )
193
- {
194
- _threadPool . Run ( awaitableState ) ;
176
+ returnBlock . Pool . Return ( returnBlock ) ;
195
177
}
196
178
}
197
179
@@ -247,11 +229,5 @@ public void GetResult()
247
229
throw new IOException ( error . Message , error ) ;
248
230
}
249
231
}
250
-
251
- public struct IncomingBuffer
252
- {
253
- public ArraySegment < byte > Data ;
254
- public IntPtr DataPtr ;
255
- }
256
232
}
257
233
}
0 commit comments