Skip to content

TokenBucketRateLimiter implementation #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/RateLimiting/src/ConcurrencyLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc
throw new ArgumentOutOfRangeException(nameof(permitCount), $"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
}

// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
// Return SuccessfulLease if requestedCount is 0 and resources are available
if (permitCount == 0 && _permitCount > 0)
{
return new ValueTask<RateLimitLease>(SuccessfulLease);
Expand Down
4 changes: 4 additions & 0 deletions src/RateLimiting/src/System.Threading.RateLimiting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="System.Threading.RateLimiting.Tests" />
</ItemGroup>

</Project>
244 changes: 171 additions & 73 deletions src/RateLimiting/src/TokenBucketRateLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,34 @@
// Pending dotnet API review

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;

namespace System.Threading.RateLimiting
{
#pragma warning disable 1591
/// <summary>
/// <see cref="RateLimiter"/> implementation that replenishes tokens periodically instead of via a release mechanism.
/// </summary>
public sealed class TokenBucketRateLimiter : RateLimiter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be disposable because of the _renewTimer? Even if AutoReplenishment is disabled, we probably would want to cancel all queued acquisition request because I doubt anyone will be calling TryReplenish() after disposal and we'll want to cancel all the Tasks and dispose all the CancellationTokenRegistrations.

{
private int _tokenCount;
private int _queueCount;
private long _lastReplenishmentTick;
private uint _lastReplenishmentTick = (uint)Environment.TickCount;

private readonly Timer? _renewTimer;
private readonly object _lock = new object();
private readonly TokenBucketRateLimiterOptions _options;
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();

// Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
private object Lock => _queue;

private static readonly RateLimitLease SuccessfulLease = new TokenBucketLease(true, null);

/// <summary>
/// Initializes the <see cref="TokenBucketRateLimiter"/>.
/// </summary>
/// <param name="options">Options to specify the behavior of the <see cref="TokenBucketRateLimiter"/>.</param>
public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
{
_tokenCount = options.TokenLimit;
Expand All @@ -33,84 +43,135 @@ public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options)
}
}

/// <inheritdoc/>
public override int GetAvailablePermits() => _tokenCount;

/// <inheritdoc/>
protected override RateLimitLease AcquireCore(int tokenCount)
{
// These amounts of resources can never be acquired
if (tokenCount > _options.TokenLimit)
{
throw new InvalidOperationException($"{tokenCount} tokens exceeds the token limit of {_options.TokenLimit}.");
throw new ArgumentOutOfRangeException(nameof(tokenCount), $"{tokenCount} tokens exceeds the token limit of {_options.TokenLimit}.");
}

// Return SuccessfulAcquisition or FailedAcquisition depending to indicate limiter state
// Return SuccessfulLease or FailedLease depending to indicate limiter state
if (tokenCount == 0)
{
if (GetAvailablePermits() > 0)
if (_tokenCount > 0)
{
return SuccessfulLease;
}

return CreateFailedTokenLease();
return CreateFailedTokenLease(tokenCount);
}

// These amounts of resources can never be acquired
if (Interlocked.Add(ref _tokenCount, -tokenCount) >= 0)
lock (Lock)
{
return SuccessfulLease;
}

Interlocked.Add(ref _tokenCount, tokenCount);
if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease))
{
return lease;
}

return CreateFailedTokenLease();
return CreateFailedTokenLease(tokenCount);
}
}

/// <inheritdoc/>
protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

// These amounts of resources can never be acquired
if (tokenCount < 0 || tokenCount > _options.TokenLimit)
if (tokenCount > _options.TokenLimit)
{
throw new ArgumentOutOfRangeException();
throw new ArgumentOutOfRangeException(nameof(tokenCount), $"{tokenCount} token(s) exceeds the permit limit of {_options.TokenLimit}.");
}

// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
if (tokenCount == 0 && GetAvailablePermits() > 0)
if (tokenCount == 0 && _tokenCount > 0)
{
// Perf: static failed/successful value tasks?
return new ValueTask<RateLimitLease>(SuccessfulLease);
}

if (Interlocked.Add(ref _tokenCount, -tokenCount) >= 0)
lock (Lock)
{
// Perf: static failed/successful value tasks?
return new ValueTask<RateLimitLease>(SuccessfulLease);
}
if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease))
{
return new ValueTask<RateLimitLease>(lease);
}

// Don't queue if queue limit reached
if (_queueCount + tokenCount > _options.QueueLimit)
{
return new ValueTask<RateLimitLease>(CreateFailedTokenLease(tokenCount));
}

Interlocked.Add(ref _tokenCount, tokenCount);
TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);

// Don't queue if queue limit reached
if (_queueCount + tokenCount > _options.QueueLimit)
{
return new ValueTask<RateLimitLease>(CreateFailedTokenLease());
CancellationTokenRegistration ctr;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(obj =>
{
((TaskCompletionSource<RateLimitLease>)obj).TrySetException(new OperationCanceledException(cancellationToken));
}, tcs);
}

RequestRegistration registration = new RequestRegistration(tokenCount, tcs, ctr);
_queue.EnqueueTail(registration);
_queueCount += tokenCount;
Debug.Assert(_queueCount <= _options.QueueLimit);

// handle cancellation
return new ValueTask<RateLimitLease>(registration.Tcs.Task);
}
}

var registration = new RequestRegistration(tokenCount);
_queue.EnqueueTail(registration);
Interlocked.Add(ref _tokenCount, tokenCount);
private RateLimitLease CreateFailedTokenLease(int tokenCount)
{
int replenishAmount = tokenCount - _tokenCount + _queueCount;
// can't have 0 replenish periods, that would mean it should be a successful lease
// if TokensPerPeriod is larger than the replenishAmount needed then it would be 0
int replenishPeriods = Math.Max(replenishAmount / _options.TokensPerPeriod, 1);

// handle cancellation
return new ValueTask<RateLimitLease>(registration.Tcs.Task);
return new TokenBucketLease(false, TimeSpan.FromTicks(_options.ReplenishmentPeriod.Ticks * replenishPeriods));
}

private RateLimitLease CreateFailedTokenLease()
private bool TryLeaseUnsynchronized(int tokenCount, [NotNullWhen(true)] out RateLimitLease? lease)
{
var replenishAmount = _tokenCount - GetAvailablePermits() + _queueCount;
var replenishPeriods = (replenishAmount / _options.TokensPerPeriod) + 1;
// if permitCount is 0 we want to queue it if there are no available permits
if (_tokenCount >= tokenCount && _tokenCount != 0)
{
if (tokenCount == 0)
{
// Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
lease = SuccessfulLease;
return true;
}

return new TokenBucketLease(false, TimeSpan.FromTicks(_options.ReplenishmentPeriod.Ticks*replenishPeriods));
// a. if there are no items queued we can lease
// b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
{
_tokenCount -= tokenCount;
Debug.Assert(_tokenCount >= 0);
lease = SuccessfulLease;
return true;
}
}

lease = null;
return false;
}

// Attempts to replenish the bucket, returns triue if enough time has elapsed and it replenishes; otherwise, false.
/// <summary>
/// Attempts to replenish the bucket.
/// </summary>
/// <returns>
/// False if <see cref="TokenBucketRateLimiterOptions.AutoReplenishment"/> is enabled, otherwise true.
/// Does not reflect if tokens were replenished.
/// </returns>
public bool TryReplenish()
{
if (_options.AutoReplenishment)
Expand All @@ -123,60 +184,86 @@ public bool TryReplenish()

private static void Replenish(object? state)
{
// Return if Replenish already running to avoid concurrency.
if (!(state is TokenBucketRateLimiter))
{
return;
}
TokenBucketRateLimiter limiter = (state as TokenBucketRateLimiter)!;
Debug.Assert(limiter is not null);

var limiter = (TokenBucketRateLimiter)state;
// Use Environment.TickCount instead of DateTime.UtcNow to avoid issues on systems where the clock can change
uint nowTicks = (uint)Environment.TickCount;
limiter!.ReplenishInternal(nowTicks);
}

var nowTicks = DateTime.Now.Ticks;
// Need to acount for multiple periods. Need to account for ticks right below the replenishment period.
if (nowTicks - limiter._lastReplenishmentTick < limiter._options.ReplenishmentPeriod.Ticks)
// Used in tests that test behavior with specific time intervals
internal void ReplenishInternal(uint nowTicks)
{
bool wrapped = false;
// (uint)TickCount will wrap every ~50 days, we can detect that by checking if the new ticks is less than the last replenishment
if (nowTicks < _lastReplenishmentTick)
{
return;
wrapped = true;
}

limiter._lastReplenishmentTick = nowTicks;
// method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes
lock (Lock)
{
// Fix the wrapping by using a long and adding uint.MaxValue in the wrapped case
long nonWrappedTicks = wrapped ? (long)nowTicks + uint.MaxValue : nowTicks;
if (nonWrappedTicks - _lastReplenishmentTick < _options.ReplenishmentPeriod.TotalMilliseconds)
{
return;
}

var availablePermits = limiter.GetAvailablePermits();
var options = limiter._options;
var maxPermits = options.TokenLimit;
_lastReplenishmentTick = nowTicks;

if (availablePermits < maxPermits)
{
var resoucesToAdd = Math.Min(options.TokensPerPeriod, maxPermits - availablePermits);
Interlocked.Add(ref limiter._tokenCount, resoucesToAdd);
}
int availablePermits = _tokenCount;
TokenBucketRateLimiterOptions options = _options;
int maxPermits = options.TokenLimit;
int resourcesToAdd;

// Process queued requests
var queue = limiter._queue;
lock (limiter._lock)
{
if (availablePermits < maxPermits)
{
resourcesToAdd = Math.Min(options.TokensPerPeriod, maxPermits - availablePermits);
}
else
{
// All tokens available, nothing to do
return;
}

// Process queued requests
Deque<RequestRegistration> queue = _queue;

_tokenCount += resourcesToAdd;
Debug.Assert(_tokenCount <= _options.TokenLimit);
while (queue.Count > 0)
{
var nextPendingRequest =
RequestRegistration nextPendingRequest =
options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? queue.PeekHead()
: queue.PeekTail();

if (Interlocked.Add(ref limiter._tokenCount, -nextPendingRequest.Count) >= 0)
if (_tokenCount >= nextPendingRequest.Count)
{
// Request can be fulfilled
var request =
nextPendingRequest =
options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? queue.DequeueHead()
: queue.DequeueTail();
Interlocked.Add(ref limiter._queueCount, -request.Count);

// requestToFulfill == request
request.Tcs.SetResult(SuccessfulLease);
_queueCount -= nextPendingRequest.Count;
_tokenCount -= nextPendingRequest.Count;
Debug.Assert(_queueCount >= 0);
Debug.Assert(_tokenCount >= 0);

if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease))
{
// Queued item was canceled so add count back
_tokenCount += nextPendingRequest.Count;
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else
{
// Request cannot be fulfilled
Interlocked.Add(ref limiter._tokenCount, nextPendingRequest.Count);
break;
}
}
Expand All @@ -195,7 +282,15 @@ public TokenBucketLease(bool isAcquired, TimeSpan? retryAfter)

public override bool IsAcquired { get; }

public override IEnumerable<string> MetadataNames => throw new NotImplementedException();
public override IEnumerable<string> MetadataNames => Enumerable();

private IEnumerable<string> Enumerable()
{
if (_retryAfter is not null)
{
yield return MetadataName.RetryAfter.Name;
}
}

public override bool TryGetMetadata(string metadataName, out object? metadata)
{
Expand All @@ -205,26 +300,29 @@ public override bool TryGetMetadata(string metadataName, out object? metadata)
return true;
}

metadata = null;
metadata = default;
return false;
}

protected override void Dispose(bool disposing) { }
}

private struct RequestRegistration
private readonly struct RequestRegistration
{
public RequestRegistration(int tokenCount)
public RequestRegistration(int tokenCount, TaskCompletionSource<RateLimitLease> tcs, CancellationTokenRegistration cancellationTokenRegistration)
{
Count = tokenCount;
// Use VoidAsyncOperationWithData<T> instead
Tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
Tcs = tcs;
CancellationTokenRegistration = cancellationTokenRegistration;
}

public int Count { get; }

public TaskCompletionSource<RateLimitLease> Tcs { get; }

public CancellationTokenRegistration CancellationTokenRegistration { get; }

}
}
#pragma warning restore
}
Loading