@@ -29,13 +29,24 @@ public class FrameConnection : IConnectionContext, ITimeoutControl
29
29
private long _timeoutTimestamp = long . MaxValue ;
30
30
private TimeoutAction _timeoutAction ;
31
31
32
+ private object _readTimingLock = new object ( ) ;
33
+ private bool _readTimingEnabled ;
34
+ private bool _readTimingPauseRequested ;
35
+ private long _readTimingElapsedTicks ;
36
+ private long _readTimingBytesRead ;
37
+
32
38
private Task _lifetimeTask ;
33
39
34
40
public FrameConnection ( FrameConnectionContext context )
35
41
{
36
42
_context = context ;
37
43
}
38
44
45
+ // For testing
46
+ internal Frame Frame => _frame ;
47
+
48
+ public bool TimedOut { get ; private set ; }
49
+
39
50
public string ConnectionId => _context . ConnectionId ;
40
51
public IPipeWriter Input => _context . Input . Writer ;
41
52
public IPipeReader Output => _context . Output . Reader ;
@@ -91,15 +102,7 @@ private async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> app
91
102
}
92
103
93
104
// _frame must be initialized before adding the connection to the connection manager
94
- _frame = new Frame < TContext > ( application , new FrameContext
95
- {
96
- ConnectionId = _context . ConnectionId ,
97
- ConnectionInformation = _context . ConnectionInformation ,
98
- ServiceContext = _context . ServiceContext ,
99
- TimeoutControl = this ,
100
- Input = input ,
101
- Output = output
102
- } ) ;
105
+ CreateFrame ( application , input , output ) ;
103
106
104
107
// Do this before the first await so we don't yield control to the transport until we've
105
108
// added the connection to the connection manager
@@ -140,9 +143,22 @@ private async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> app
140
143
}
141
144
}
142
145
146
+ internal void CreateFrame < TContext > ( IHttpApplication < TContext > application , IPipeReader input , IPipe output )
147
+ {
148
+ _frame = new Frame < TContext > ( application , new FrameContext
149
+ {
150
+ ConnectionId = _context . ConnectionId ,
151
+ ConnectionInformation = _context . ConnectionInformation ,
152
+ ServiceContext = _context . ServiceContext ,
153
+ TimeoutControl = this ,
154
+ Input = input ,
155
+ Output = output
156
+ } ) ;
157
+ }
158
+
143
159
public void OnConnectionClosed ( Exception ex )
144
160
{
145
- Debug . Assert ( _frame != null , $ "nameof({ _frame } ) is null") ;
161
+ Debug . Assert ( _frame != null , $ "{ nameof ( _frame ) } is null") ;
146
162
147
163
// Abort the connection (if not already aborted)
148
164
_frame . Abort ( ex ) ;
@@ -152,7 +168,7 @@ public void OnConnectionClosed(Exception ex)
152
168
153
169
public Task StopAsync ( )
154
170
{
155
- Debug . Assert ( _frame != null , $ "nameof({ _frame } ) is null") ;
171
+ Debug . Assert ( _frame != null , $ "{ nameof ( _frame ) } is null") ;
156
172
157
173
_frame . Stop ( ) ;
158
174
@@ -161,32 +177,41 @@ public Task StopAsync()
161
177
162
178
public void Abort ( Exception ex )
163
179
{
164
- Debug . Assert ( _frame != null , $ "nameof({ _frame } ) is null") ;
180
+ Debug . Assert ( _frame != null , $ "{ nameof ( _frame ) } is null") ;
165
181
166
182
// Abort the connection (if not already aborted)
167
183
_frame . Abort ( ex ) ;
168
184
}
169
185
170
186
public Task AbortAsync ( Exception ex )
171
187
{
172
- Debug . Assert ( _frame != null , $ "nameof({ _frame } ) is null") ;
188
+ Debug . Assert ( _frame != null , $ "{ nameof ( _frame ) } is null") ;
173
189
174
190
// Abort the connection (if not already aborted)
175
191
_frame . Abort ( ex ) ;
176
192
177
193
return _lifetimeTask ;
178
194
}
179
195
180
- public void Timeout ( )
196
+ public void SetTimeoutResponse ( )
181
197
{
182
- Debug . Assert ( _frame != null , $ "nameof({ _frame } ) is null") ;
198
+ Debug . Assert ( _frame != null , $ "{ nameof ( _frame ) } is null") ;
183
199
184
200
_frame . SetBadRequestState ( RequestRejectionReason . RequestTimeout ) ;
185
201
}
186
202
203
+ public void Timeout ( )
204
+ {
205
+ Debug . Assert ( _frame != null , $ "{ nameof ( _frame ) } is null") ;
206
+
207
+ TimedOut = true ;
208
+ _readTimingEnabled = false ;
209
+ _frame . Stop ( ) ;
210
+ }
211
+
187
212
private async Task < Stream > ApplyConnectionAdaptersAsync ( )
188
213
{
189
- Debug . Assert ( _frame != null , $ "nameof({ _frame } ) is null") ;
214
+ Debug . Assert ( _frame != null , $ "{ nameof ( _frame ) } is null") ;
190
215
191
216
var features = new FeatureCollection ( ) ;
192
217
var connectionAdapters = _context . ConnectionAdapters ;
@@ -231,7 +256,7 @@ private void DisposeAdaptedConnections()
231
256
232
257
public void Tick ( DateTimeOffset now )
233
258
{
234
- Debug . Assert ( _frame != null , $ "nameof({ _frame } ) is null") ;
259
+ Debug . Assert ( _frame != null , $ "{ nameof ( _frame ) } is null") ;
235
260
236
261
var timestamp = now . Ticks ;
237
262
@@ -242,10 +267,41 @@ public void Tick(DateTimeOffset now)
242
267
243
268
if ( _timeoutAction == TimeoutAction . SendTimeoutResponse )
244
269
{
245
- Timeout ( ) ;
270
+ SetTimeoutResponse ( ) ;
246
271
}
247
272
248
- _frame . Stop ( ) ;
273
+ Timeout ( ) ;
274
+ }
275
+ else
276
+ {
277
+ lock ( _readTimingLock )
278
+ {
279
+ if ( _readTimingEnabled )
280
+ {
281
+ _readTimingElapsedTicks += timestamp - _lastTimestamp ;
282
+
283
+ if ( _frame . RequestBodyMinimumDataRate ? . Rate > 0 && _readTimingElapsedTicks > _frame . RequestBodyMinimumDataRate . GracePeriod . Ticks )
284
+ {
285
+ var elapsedSeconds = ( double ) _readTimingElapsedTicks / TimeSpan . TicksPerSecond ;
286
+ var rate = Interlocked . Read ( ref _readTimingBytesRead ) / elapsedSeconds ;
287
+
288
+ if ( rate < _frame . RequestBodyMinimumDataRate . Rate )
289
+ {
290
+ Log . RequestBodyMininumDataRateNotSatisfied ( _context . ConnectionId , _frame . TraceIdentifier , _frame . RequestBodyMinimumDataRate . Rate ) ;
291
+ Timeout ( ) ;
292
+ }
293
+ }
294
+
295
+ // PauseTimingReads() cannot just set _timingReads to false. It needs to go through at least one tick
296
+ // before pausing, otherwise _readTimingElapsed might never be updated if PauseTimingReads() is always
297
+ // called before the next tick.
298
+ if ( _readTimingPauseRequested )
299
+ {
300
+ _readTimingEnabled = false ;
301
+ _readTimingPauseRequested = false ;
302
+ }
303
+ }
304
+ }
249
305
}
250
306
251
307
Interlocked . Exchange ( ref _lastTimestamp , timestamp ) ;
@@ -275,5 +331,47 @@ private void AssignTimeout(long ticks, TimeoutAction timeoutAction)
275
331
// Add Heartbeat.Interval since this can be called right before the next heartbeat.
276
332
Interlocked . Exchange ( ref _timeoutTimestamp , _lastTimestamp + ticks + Heartbeat . Interval . Ticks ) ;
277
333
}
334
+
335
+ public void StartTimingReads ( )
336
+ {
337
+ lock ( _readTimingLock )
338
+ {
339
+ _readTimingElapsedTicks = 0 ;
340
+ _readTimingBytesRead = 0 ;
341
+ _readTimingEnabled = true ;
342
+ }
343
+ }
344
+
345
+ public void StopTimingReads ( )
346
+ {
347
+ lock ( _readTimingLock )
348
+ {
349
+ _readTimingEnabled = false ;
350
+ }
351
+ }
352
+
353
+ public void PauseTimingReads ( )
354
+ {
355
+ lock ( _readTimingLock )
356
+ {
357
+ _readTimingPauseRequested = true ;
358
+ }
359
+ }
360
+
361
+ public void ResumeTimingReads ( )
362
+ {
363
+ lock ( _readTimingLock )
364
+ {
365
+ _readTimingEnabled = true ;
366
+
367
+ // In case pause and resume were both called between ticks
368
+ _readTimingPauseRequested = false ;
369
+ }
370
+ }
371
+
372
+ public void BytesRead ( int count )
373
+ {
374
+ Interlocked . Add ( ref _readTimingBytesRead , count ) ;
375
+ }
278
376
}
279
377
}
0 commit comments