5
5
6
6
using System . Collections . Generic ;
7
7
using System . Diagnostics ;
8
+ using System . Diagnostics . CodeAnalysis ;
8
9
using System . Threading . Tasks ;
9
10
10
11
namespace System . Threading . RateLimiting
@@ -19,16 +20,18 @@ public sealed class TokenBucketRateLimiter : RateLimiter
19
20
private long _lastReplenishmentTick = Environment . TickCount ;
20
21
21
22
private readonly Timer ? _renewTimer ;
22
- private readonly object _lock = new object ( ) ;
23
23
private readonly TokenBucketRateLimiterOptions _options ;
24
24
private readonly Deque < RequestRegistration > _queue = new Deque < RequestRegistration > ( ) ;
25
25
26
+ // Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
27
+ private object Lock => _queue ;
28
+
26
29
private static readonly RateLimitLease SuccessfulLease = new TokenBucketLease ( true , null ) ;
27
30
28
31
/// <summary>
29
32
/// Initializes the <see cref="TokenBucketRateLimiter"/>.
30
33
/// </summary>
31
- /// <param name="options"></param>
34
+ /// <param name="options">Options to specify the behavior of the <see cref="TokenBucketRateLimiter"/>. </param>
32
35
public TokenBucketRateLimiter ( TokenBucketRateLimiterOptions options )
33
36
{
34
37
_tokenCount = options . TokenLimit ;
@@ -40,10 +43,7 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
40
43
}
41
44
}
42
45
43
- /// <summary>
44
- /// An estimated count of available tokens.
45
- /// </summary>
46
- /// <returns></returns>
46
+ /// <inheritdoc/>
47
47
public override int GetAvailablePermits ( ) => _tokenCount ;
48
48
49
49
/// <inheritdoc/>
@@ -52,26 +52,25 @@ protected override RateLimitLease AcquireCore(int tokenCount)
52
52
// These amounts of resources can never be acquired
53
53
if ( tokenCount > _options . TokenLimit )
54
54
{
55
- throw new InvalidOperationException ( $ "{ tokenCount } tokens exceeds the token limit of { _options . TokenLimit } .") ;
55
+ throw new ArgumentOutOfRangeException ( nameof ( tokenCount ) , $ "{ tokenCount } tokens exceeds the token limit of { _options . TokenLimit } .") ;
56
56
}
57
57
58
- // Return SuccessfulAcquisition or FailedAcquisition depending to indicate limiter state
58
+ // Return SuccessfulLease or FailedLease depending to indicate limiter state
59
59
if ( tokenCount == 0 )
60
60
{
61
- if ( GetAvailablePermits ( ) > 0 )
61
+ if ( _tokenCount > 0 )
62
62
{
63
63
return SuccessfulLease ;
64
64
}
65
65
66
66
return CreateFailedTokenLease ( tokenCount ) ;
67
67
}
68
68
69
- lock ( _lock )
69
+ lock ( Lock )
70
70
{
71
- if ( GetAvailablePermits ( ) >= tokenCount )
71
+ if ( TryLeaseUnsynchronized ( tokenCount , out RateLimitLease ? lease ) )
72
72
{
73
- _tokenCount -= tokenCount ;
74
- return SuccessfulLease ;
73
+ return lease ;
75
74
}
76
75
77
76
return CreateFailedTokenLease ( tokenCount ) ;
@@ -86,23 +85,20 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, Cance
86
85
// These amounts of resources can never be acquired
87
86
if ( tokenCount > _options . TokenLimit )
88
87
{
89
- throw new InvalidOperationException ( $ "{ tokenCount } token(s) exceeds the permit limit of { _options . TokenLimit } .") ;
88
+ throw new ArgumentOutOfRangeException ( nameof ( tokenCount ) , $ "{ tokenCount } token(s) exceeds the permit limit of { _options . TokenLimit } .") ;
90
89
}
91
90
92
91
// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
93
- if ( tokenCount == 0 && GetAvailablePermits ( ) > 0 )
92
+ if ( tokenCount == 0 && _tokenCount > 0 )
94
93
{
95
- // Perf: static failed/successful value tasks?
96
94
return new ValueTask < RateLimitLease > ( SuccessfulLease ) ;
97
95
}
98
96
99
- lock ( _lock )
97
+ lock ( Lock )
100
98
{
101
- if ( GetAvailablePermits ( ) >= tokenCount && GetAvailablePermits ( ) != 0 )
99
+ if ( TryLeaseUnsynchronized ( tokenCount , out RateLimitLease ? lease ) )
102
100
{
103
- _tokenCount -= tokenCount ;
104
- // Perf: static failed/successful value tasks?
105
- return new ValueTask < RateLimitLease > ( SuccessfulLease ) ;
101
+ return new ValueTask < RateLimitLease > ( lease ) ;
106
102
}
107
103
108
104
// Don't queue if queue limit reached
@@ -116,8 +112,12 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, Cance
116
112
CancellationTokenRegistration ctr ;
117
113
if ( cancellationToken . CanBeCanceled )
118
114
{
119
- ctr = cancellationToken . Register ( obj => CancellationRequested ( ( TaskCompletionSource < RateLimitLease > ) obj , cancellationToken ) , tcs ) ;
115
+ ctr = cancellationToken . Register ( obj =>
116
+ {
117
+ ( ( TaskCompletionSource < RateLimitLease > ) obj ) . TrySetException ( new OperationCanceledException ( cancellationToken ) ) ;
118
+ } , tcs ) ;
120
119
}
120
+
121
121
RequestRegistration registration = new RequestRegistration ( tokenCount , tcs , ctr ) ;
122
122
_queue . EnqueueTail ( registration ) ;
123
123
_queueCount += tokenCount ;
@@ -130,12 +130,39 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, Cance
130
130
131
131
private RateLimitLease CreateFailedTokenLease ( int tokenCount )
132
132
{
133
- int replenishAmount = tokenCount - GetAvailablePermits ( ) + _queueCount ;
133
+ int replenishAmount = tokenCount - _tokenCount + _queueCount ;
134
134
// can't have 0 replenish periods, that would mean it should be a successful lease
135
135
// if TokensPerPeriod is larger than the replenishAmount needed then it would be 0
136
136
int replenishPeriods = Math . Max ( replenishAmount / _options . TokensPerPeriod , 1 ) ;
137
137
138
- return new TokenBucketLease ( false , TimeSpan . FromTicks ( _options . ReplenishmentPeriod . Ticks * replenishPeriods ) ) ;
138
+ return new TokenBucketLease ( false , TimeSpan . FromTicks ( _options . ReplenishmentPeriod . Ticks * replenishPeriods ) ) ;
139
+ }
140
+
141
+ private bool TryLeaseUnsynchronized ( int tokenCount , [ NotNullWhen ( true ) ] out RateLimitLease ? lease )
142
+ {
143
+ // if permitCount is 0 we want to queue it if there are no available permits
144
+ if ( _tokenCount >= tokenCount && _tokenCount != 0 )
145
+ {
146
+ if ( tokenCount == 0 )
147
+ {
148
+ // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
149
+ lease = SuccessfulLease ;
150
+ return true ;
151
+ }
152
+
153
+ // a. if there are no items queued we can lease
154
+ // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
155
+ if ( _queueCount == 0 || ( _queueCount > 0 && _options . QueueProcessingOrder == QueueProcessingOrder . NewestFirst ) )
156
+ {
157
+ _tokenCount -= tokenCount ;
158
+ Debug . Assert ( _tokenCount >= 0 ) ;
159
+ lease = SuccessfulLease ;
160
+ return true ;
161
+ }
162
+ }
163
+
164
+ lease = null ;
165
+ return false ;
139
166
}
140
167
141
168
/// <summary>
@@ -165,7 +192,7 @@ private static void Replenish(object? state)
165
192
long nowTicks = Environment . TickCount * TimeSpan . TicksPerMillisecond ;
166
193
167
194
// method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes
168
- lock ( limiter ! . _lock )
195
+ lock ( limiter ! . Lock )
169
196
{
170
197
if ( nowTicks - limiter . _lastReplenishmentTick < limiter . _options . ReplenishmentPeriod . Ticks )
171
198
{
@@ -214,7 +241,6 @@ private static void Replenish(object? state)
214
241
Debug . Assert ( limiter . _queueCount >= 0 ) ;
215
242
Debug . Assert ( limiter . _tokenCount >= 0 ) ;
216
243
217
- // requestToFulfill == request
218
244
if ( ! nextPendingRequest . Tcs . TrySetResult ( SuccessfulLease ) )
219
245
{
220
246
// Queued item was canceled so add count back
@@ -231,14 +257,6 @@ private static void Replenish(object? state)
231
257
}
232
258
}
233
259
234
- private void CancellationRequested ( TaskCompletionSource < RateLimitLease > tcs , CancellationToken token )
235
- {
236
- lock ( _lock )
237
- {
238
- tcs . TrySetException ( new OperationCanceledException ( token ) ) ;
239
- }
240
- }
241
-
242
260
private class TokenBucketLease : RateLimitLease
243
261
{
244
262
private readonly TimeSpan ? _retryAfter ;
@@ -255,12 +273,10 @@ public TokenBucketLease(bool isAcquired, TimeSpan? retryAfter)
255
273
256
274
private IEnumerable < string > Enumerable ( )
257
275
{
258
- if ( _retryAfter is null )
276
+ if ( _retryAfter is not null )
259
277
{
260
- yield break ;
278
+ yield return MetadataName . RetryAfter . Name ;
261
279
}
262
-
263
- yield return MetadataName . RetryAfter . Name ;
264
280
}
265
281
266
282
public override bool TryGetMetadata ( string metadataName , out object ? metadata )
@@ -271,14 +287,14 @@ public override bool TryGetMetadata(string metadataName, out object? metadata)
271
287
return true ;
272
288
}
273
289
274
- metadata = null ;
290
+ metadata = default ;
275
291
return false ;
276
292
}
277
293
278
294
protected override void Dispose ( bool disposing ) { }
279
295
}
280
296
281
- private struct RequestRegistration
297
+ private readonly struct RequestRegistration
282
298
{
283
299
public RequestRegistration ( int tokenCount , TaskCompletionSource < RateLimitLease > tcs , CancellationTokenRegistration cancellationTokenRegistration )
284
300
{
0 commit comments