Skip to content

Commit df6e3f6

Browse files
TokenBucketRateLimiter implementation (#390)
1 parent 4856c28 commit df6e3f6

7 files changed

+902
-110
lines changed

src/RateLimiting/src/ConcurrencyLimiter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc
8282
throw new ArgumentOutOfRangeException(nameof(permitCount), $"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
8383
}
8484

85-
// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
85+
// Return SuccessfulLease if requestedCount is 0 and resources are available
8686
if (permitCount == 0 && _permitCount > 0)
8787
{
8888
return new ValueTask<RateLimitLease>(SuccessfulLease);

src/RateLimiting/src/System.Threading.RateLimiting.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,8 @@
2020
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
2121
</ItemGroup>
2222

23+
<ItemGroup>
24+
<InternalsVisibleTo Include="System.Threading.RateLimiting.Tests" />
25+
</ItemGroup>
26+
2327
</Project>

src/RateLimiting/src/TokenBucketRateLimiter.cs

Lines changed: 171 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,34 @@
44
// Pending dotnet API review
55

66
using System.Collections.Generic;
7+
using System.Diagnostics;
8+
using System.Diagnostics.CodeAnalysis;
79
using System.Threading.Tasks;
810

911
namespace System.Threading.RateLimiting
1012
{
11-
#pragma warning disable 1591
13+
/// <summary>
14+
/// <see cref="RateLimiter"/> implementation that replenishes tokens periodically instead of via a release mechanism.
15+
/// </summary>
1216
public sealed class TokenBucketRateLimiter : RateLimiter
1317
{
1418
private int _tokenCount;
1519
private int _queueCount;
16-
private long _lastReplenishmentTick;
20+
private uint _lastReplenishmentTick = (uint)Environment.TickCount;
1721

1822
private readonly Timer? _renewTimer;
19-
private readonly object _lock = new object();
2023
private readonly TokenBucketRateLimiterOptions _options;
2124
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();
2225

26+
// Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
27+
private object Lock => _queue;
28+
2329
private static readonly RateLimitLease SuccessfulLease = new TokenBucketLease(true, null);
2430

31+
/// <summary>
32+
/// Initializes the <see cref="TokenBucketRateLimiter"/>.
33+
/// </summary>
34+
/// <param name="options">Options to specify the behavior of the <see cref="TokenBucketRateLimiter"/>.</param>
2535
public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
2636
{
2737
_tokenCount = options.TokenLimit;
@@ -33,84 +43,135 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
3343
}
3444
}
3545

46+
/// <inheritdoc/>
3647
public override int GetAvailablePermits() => _tokenCount;
3748

49+
/// <inheritdoc/>
3850
protected override RateLimitLease AcquireCore(int tokenCount)
3951
{
4052
// These amounts of resources can never be acquired
4153
if (tokenCount > _options.TokenLimit)
4254
{
43-
throw new InvalidOperationException($"{tokenCount} tokens exceeds the token limit of {_options.TokenLimit}.");
55+
throw new ArgumentOutOfRangeException(nameof(tokenCount), $"{tokenCount} tokens exceeds the token limit of {_options.TokenLimit}.");
4456
}
4557

46-
// Return SuccessfulAcquisition or FailedAcquisition depending to indicate limiter state
58+
// Return SuccessfulLease or FailedLease depending to indicate limiter state
4759
if (tokenCount == 0)
4860
{
49-
if (GetAvailablePermits() > 0)
61+
if (_tokenCount > 0)
5062
{
5163
return SuccessfulLease;
5264
}
5365

54-
return CreateFailedTokenLease();
66+
return CreateFailedTokenLease(tokenCount);
5567
}
5668

57-
// These amounts of resources can never be acquired
58-
if (Interlocked.Add(ref _tokenCount, -tokenCount) >= 0)
69+
lock (Lock)
5970
{
60-
return SuccessfulLease;
61-
}
62-
63-
Interlocked.Add(ref _tokenCount, tokenCount);
71+
if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease))
72+
{
73+
return lease;
74+
}
6475

65-
return CreateFailedTokenLease();
76+
return CreateFailedTokenLease(tokenCount);
77+
}
6678
}
6779

80+
/// <inheritdoc/>
6881
protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, CancellationToken cancellationToken = default)
6982
{
83+
cancellationToken.ThrowIfCancellationRequested();
84+
7085
// These amounts of resources can never be acquired
71-
if (tokenCount < 0 || tokenCount > _options.TokenLimit)
86+
if (tokenCount > _options.TokenLimit)
7287
{
73-
throw new ArgumentOutOfRangeException();
88+
throw new ArgumentOutOfRangeException(nameof(tokenCount), $"{tokenCount} token(s) exceeds the permit limit of {_options.TokenLimit}.");
7489
}
7590

7691
// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
77-
if (tokenCount == 0 && GetAvailablePermits() > 0)
92+
if (tokenCount == 0 && _tokenCount > 0)
7893
{
79-
// Perf: static failed/successful value tasks?
8094
return new ValueTask<RateLimitLease>(SuccessfulLease);
8195
}
8296

83-
if (Interlocked.Add(ref _tokenCount, -tokenCount) >= 0)
97+
lock (Lock)
8498
{
85-
// Perf: static failed/successful value tasks?
86-
return new ValueTask<RateLimitLease>(SuccessfulLease);
87-
}
99+
if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease))
100+
{
101+
return new ValueTask<RateLimitLease>(lease);
102+
}
103+
104+
// Don't queue if queue limit reached
105+
if (_queueCount + tokenCount > _options.QueueLimit)
106+
{
107+
return new ValueTask<RateLimitLease>(CreateFailedTokenLease(tokenCount));
108+
}
88109

89-
Interlocked.Add(ref _tokenCount, tokenCount);
110+
TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
90111

91-
// Don't queue if queue limit reached
92-
if (_queueCount + tokenCount > _options.QueueLimit)
93-
{
94-
return new ValueTask<RateLimitLease>(CreateFailedTokenLease());
112+
CancellationTokenRegistration ctr;
113+
if (cancellationToken.CanBeCanceled)
114+
{
115+
ctr = cancellationToken.Register(obj =>
116+
{
117+
((TaskCompletionSource<RateLimitLease>)obj).TrySetException(new OperationCanceledException(cancellationToken));
118+
}, tcs);
119+
}
120+
121+
RequestRegistration registration = new RequestRegistration(tokenCount, tcs, ctr);
122+
_queue.EnqueueTail(registration);
123+
_queueCount += tokenCount;
124+
Debug.Assert(_queueCount <= _options.QueueLimit);
125+
126+
// handle cancellation
127+
return new ValueTask<RateLimitLease>(registration.Tcs.Task);
95128
}
129+
}
96130

97-
var registration = new RequestRegistration(tokenCount);
98-
_queue.EnqueueTail(registration);
99-
Interlocked.Add(ref _tokenCount, tokenCount);
131+
private RateLimitLease CreateFailedTokenLease(int tokenCount)
132+
{
133+
int replenishAmount = tokenCount - _tokenCount + _queueCount;
134+
// can't have 0 replenish periods, that would mean it should be a successful lease
135+
// if TokensPerPeriod is larger than the replenishAmount needed then it would be 0
136+
int replenishPeriods = Math.Max(replenishAmount / _options.TokensPerPeriod, 1);
100137

101-
// handle cancellation
102-
return new ValueTask<RateLimitLease>(registration.Tcs.Task);
138+
return new TokenBucketLease(false, TimeSpan.FromTicks(_options.ReplenishmentPeriod.Ticks * replenishPeriods));
103139
}
104140

105-
private RateLimitLease CreateFailedTokenLease()
141+
private bool TryLeaseUnsynchronized(int tokenCount, [NotNullWhen(true)] out RateLimitLease? lease)
106142
{
107-
var replenishAmount = _tokenCount - GetAvailablePermits() + _queueCount;
108-
var replenishPeriods = (replenishAmount / _options.TokensPerPeriod) + 1;
143+
// if permitCount is 0 we want to queue it if there are no available permits
144+
if (_tokenCount >= tokenCount && _tokenCount != 0)
145+
{
146+
if (tokenCount == 0)
147+
{
148+
// Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
149+
lease = SuccessfulLease;
150+
return true;
151+
}
109152

110-
return new TokenBucketLease(false, TimeSpan.FromTicks(_options.ReplenishmentPeriod.Ticks*replenishPeriods));
153+
// a. if there are no items queued we can lease
154+
// b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
155+
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
156+
{
157+
_tokenCount -= tokenCount;
158+
Debug.Assert(_tokenCount >= 0);
159+
lease = SuccessfulLease;
160+
return true;
161+
}
162+
}
163+
164+
lease = null;
165+
return false;
111166
}
112167

113-
// Attempts to replenish the bucket, returns triue if enough time has elapsed and it replenishes; otherwise, false.
168+
/// <summary>
169+
/// Attempts to replenish the bucket.
170+
/// </summary>
171+
/// <returns>
172+
/// False if <see cref="TokenBucketRateLimiterOptions.AutoReplenishment"/> is enabled, otherwise true.
173+
/// Does not reflect if tokens were replenished.
174+
/// </returns>
114175
public bool TryReplenish()
115176
{
116177
if (_options.AutoReplenishment)
@@ -123,60 +184,86 @@ public bool TryReplenish()
123184

124185
private static void Replenish(object? state)
125186
{
126-
// Return if Replenish already running to avoid concurrency.
127-
if (!(state is TokenBucketRateLimiter))
128-
{
129-
return;
130-
}
187+
TokenBucketRateLimiter limiter = (state as TokenBucketRateLimiter)!;
188+
Debug.Assert(limiter is not null);
131189

132-
var limiter = (TokenBucketRateLimiter)state;
190+
// Use Environment.TickCount instead of DateTime.UtcNow to avoid issues on systems where the clock can change
191+
uint nowTicks = (uint)Environment.TickCount;
192+
limiter!.ReplenishInternal(nowTicks);
193+
}
133194

134-
var nowTicks = DateTime.Now.Ticks;
135-
// Need to acount for multiple periods. Need to account for ticks right below the replenishment period.
136-
if (nowTicks - limiter._lastReplenishmentTick < limiter._options.ReplenishmentPeriod.Ticks)
195+
// Used in tests that test behavior with specific time intervals
196+
internal void ReplenishInternal(uint nowTicks)
197+
{
198+
bool wrapped = false;
199+
// (uint)TickCount will wrap every ~50 days, we can detect that by checking if the new ticks is less than the last replenishment
200+
if (nowTicks < _lastReplenishmentTick)
137201
{
138-
return;
202+
wrapped = true;
139203
}
140204

141-
limiter._lastReplenishmentTick = nowTicks;
205+
// method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes
206+
lock (Lock)
207+
{
208+
// Fix the wrapping by using a long and adding uint.MaxValue in the wrapped case
209+
long nonWrappedTicks = wrapped ? (long)nowTicks + uint.MaxValue : nowTicks;
210+
if (nonWrappedTicks - _lastReplenishmentTick < _options.ReplenishmentPeriod.TotalMilliseconds)
211+
{
212+
return;
213+
}
142214

143-
var availablePermits = limiter.GetAvailablePermits();
144-
var options = limiter._options;
145-
var maxPermits = options.TokenLimit;
215+
_lastReplenishmentTick = nowTicks;
146216

147-
if (availablePermits < maxPermits)
148-
{
149-
var resoucesToAdd = Math.Min(options.TokensPerPeriod, maxPermits - availablePermits);
150-
Interlocked.Add(ref limiter._tokenCount, resoucesToAdd);
151-
}
217+
int availablePermits = _tokenCount;
218+
TokenBucketRateLimiterOptions options = _options;
219+
int maxPermits = options.TokenLimit;
220+
int resourcesToAdd;
152221

153-
// Process queued requests
154-
var queue = limiter._queue;
155-
lock (limiter._lock)
156-
{
222+
if (availablePermits < maxPermits)
223+
{
224+
resourcesToAdd = Math.Min(options.TokensPerPeriod, maxPermits - availablePermits);
225+
}
226+
else
227+
{
228+
// All tokens available, nothing to do
229+
return;
230+
}
231+
232+
// Process queued requests
233+
Deque<RequestRegistration> queue = _queue;
234+
235+
_tokenCount += resourcesToAdd;
236+
Debug.Assert(_tokenCount <= _options.TokenLimit);
157237
while (queue.Count > 0)
158238
{
159-
var nextPendingRequest =
239+
RequestRegistration nextPendingRequest =
160240
options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
161241
? queue.PeekHead()
162242
: queue.PeekTail();
163243

164-
if (Interlocked.Add(ref limiter._tokenCount, -nextPendingRequest.Count) >= 0)
244+
if (_tokenCount >= nextPendingRequest.Count)
165245
{
166246
// Request can be fulfilled
167-
var request =
247+
nextPendingRequest =
168248
options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
169249
? queue.DequeueHead()
170250
: queue.DequeueTail();
171-
Interlocked.Add(ref limiter._queueCount, -request.Count);
172251

173-
// requestToFulfill == request
174-
request.Tcs.SetResult(SuccessfulLease);
252+
_queueCount -= nextPendingRequest.Count;
253+
_tokenCount -= nextPendingRequest.Count;
254+
Debug.Assert(_queueCount >= 0);
255+
Debug.Assert(_tokenCount >= 0);
256+
257+
if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
258+
{
259+
// Queued item was canceled so add count back
260+
_tokenCount += nextPendingRequest.Count;
261+
}
262+
nextPendingRequest.CancellationTokenRegistration.Dispose();
175263
}
176264
else
177265
{
178266
// Request cannot be fulfilled
179-
Interlocked.Add(ref limiter._tokenCount, nextPendingRequest.Count);
180267
break;
181268
}
182269
}
@@ -195,7 +282,15 @@ public TokenBucketLease(bool isAcquired, TimeSpan? retryAfter)
195282

196283
public override bool IsAcquired { get; }
197284

198-
public override IEnumerable<string> MetadataNames => throw new NotImplementedException();
285+
public override IEnumerable<string> MetadataNames => Enumerable();
286+
287+
private IEnumerable<string> Enumerable()
288+
{
289+
if (_retryAfter is not null)
290+
{
291+
yield return MetadataName.RetryAfter.Name;
292+
}
293+
}
199294

200295
public override bool TryGetMetadata(string metadataName, out object? metadata)
201296
{
@@ -205,26 +300,29 @@ public override bool TryGetMetadata(string metadataName, out object? metadata)
205300
return true;
206301
}
207302

208-
metadata = null;
303+
metadata = default;
209304
return false;
210305
}
211306

212307
protected override void Dispose(bool disposing) { }
213308
}
214309

215-
private struct RequestRegistration
310+
private readonly struct RequestRegistration
216311
{
217-
public RequestRegistration(int tokenCount)
312+
public RequestRegistration(int tokenCount, TaskCompletionSource<RateLimitLease> tcs, CancellationTokenRegistration cancellationTokenRegistration)
218313
{
219314
Count = tokenCount;
220315
// Use VoidAsyncOperationWithData<T> instead
221-
Tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
316+
Tcs = tcs;
317+
CancellationTokenRegistration = cancellationTokenRegistration;
222318
}
223319

224320
public int Count { get; }
225321

226322
public TaskCompletionSource<RateLimitLease> Tcs { get; }
323+
324+
public CancellationTokenRegistration CancellationTokenRegistration { get; }
325+
227326
}
228327
}
229-
#pragma warning restore
230328
}

0 commit comments

Comments
 (0)