Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 83 additions & 25 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,26 @@ public bool TryRemove(TKey key)
/// </summary>
/// <param name="key">The requested key to be refreshed.</param>
/// <param name="singleValueInitFunc">A func delegate to be invoked at a later point of time.</param>
public async Task RefreshAsync(
public void Refresh(
TKey key,
Func<TValue, Task<TValue>> singleValueInitFunc)
{
if (this.values.TryGetValue(key, out AsyncLazyWithRefreshTask<TValue> initialLazyValue))
{
await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
key: key,
initialValue: initialLazyValue,
callbackDelegate: singleValueInitFunc,
operationName: nameof(RefreshAsync));
Task backgroundRefreshTask = initialLazyValue.RefreshAsync(
createRefreshTask: singleValueInitFunc);

Task continuationTask = backgroundRefreshTask.ContinueWith(task =>
{
if (task.IsFaulted)
{
this.RemoveKeyForBackgroundExceptions(
key,
initialLazyValue,
task?.Exception?.InnerException,
nameof(Refresh));
}
});
}
}

Expand All @@ -218,26 +227,46 @@ private async Task<TValue> UpdateCacheAndGetValueFromBackgroundTaskAsync(
}
catch (Exception ex)
{
if (initialValue.ShouldRemoveFromCacheThreadSafe())
{
bool removed = false;
this.RemoveKeyForBackgroundExceptions(
key,
initialValue,
ex,
operationName);

// In some scenarios when a background failure occurs like a 404
// the initial cache value should be removed.
if (this.removeFromCacheOnBackgroundRefreshException(ex))
{
removed = this.TryRemove(key);
}
throw;
}
}

/// <summary>
/// Removes the specified key from the async non blocking cache for specific exception types.
/// </summary>
/// <param name="key">The requested key to be updated.</param>
/// <param name="initialValue">An instance of <see cref="AsyncLazyWithRefreshTask{T}"/> containing the initial cached value.</param>
/// <param name="ex">A func callback delegate to be invoked at a later point of time.</param>
/// <param name="operationName">A string indicating the operation on the cache.</param>
private void RemoveKeyForBackgroundExceptions(
TKey key,
AsyncLazyWithRefreshTask<TValue> initialValue,
Exception ex,
string operationName)
{
if (initialValue.ShouldRemoveFromCacheThreadSafe())
{
bool removed = false;

DefaultTrace.TraceError(
"AsyncCacheNonBlocking Failed. key: {0}, operation: {1}, tryRemoved: {2}, Exception: {3}",
key,
operationName,
removed,
ex);
// In some scenarios when a background failure occurs like a 404
// the initial cache value should be removed.
if (this.removeFromCacheOnBackgroundRefreshException(ex))
{
removed = this.TryRemove(key);
}

throw;
DefaultTrace.TraceError(
"AsyncCacheNonBlocking Failed. key: {0}, operation: {1}, tryRemoved: {2}, Exception: {3}",
key,
operationName,
removed,
ex);
}
}

Expand All @@ -250,8 +279,10 @@ private sealed class AsyncLazyWithRefreshTask<T>
{
private readonly CancellationToken cancellationToken;
private readonly Func<T, Task<T>> createValueFunc;
private readonly object valueLock = new object();
private readonly object removedFromCacheLock = new object();
private readonly object valueLock = new ();
private readonly object removedFromCacheLock = new ();
private readonly object refreshLock = new ();
private Task<T> backgroundRefreshTask;

private bool removedFromCache = false;
private Task<T> value;
Expand All @@ -265,6 +296,7 @@ public AsyncLazyWithRefreshTask(
this.createValueFunc = null;
this.value = Task.FromResult(value);
this.refreshInProgress = null;
this.backgroundRefreshTask = null;
}

public AsyncLazyWithRefreshTask(
Expand All @@ -275,6 +307,7 @@ public AsyncLazyWithRefreshTask(
this.createValueFunc = taskFactory;
this.value = null;
this.refreshInProgress = null;
this.backgroundRefreshTask = null;
}

public bool IsValueCreated => this.value != null;
Expand Down Expand Up @@ -386,14 +419,39 @@ public bool ShouldRemoveFromCacheThreadSafe()
}
}

public Task RefreshAsync(
Func<T, Task<T>> createRefreshTask)
{
lock (this.refreshLock)
{
if (AsyncLazyWithRefreshTask<T>.IsTaskRunning(this.backgroundRefreshTask))
{
DefaultTrace.TraceInformation(
message: "Background refresh task is already in progress, skip creating a new one.");

return Task.CompletedTask;
}
else
{
DefaultTrace.TraceInformation(
message: "Started a new background refresh task.");

this.backgroundRefreshTask = Task.Run(async () => await this.CreateAndWaitForBackgroundRefreshTaskAsync(
createRefreshTask));

return this.backgroundRefreshTask;
}
}
}

private static bool IsTaskRunning(Task t)
{
if (t == null)
{
return false;
}

return !t.IsCompleted;
return !t.IsCompleted && !t.IsFaulted;
}
}

Expand Down
53 changes: 34 additions & 19 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
private bool disposedValue;
private bool validateUnknownReplicas;
private IOpenConnectionsHandler openConnectionsHandler;

public GatewayAddressCache(
Expand Down Expand Up @@ -92,6 +93,7 @@ public GatewayAddressCache(

this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = replicaAddressValidationEnabled;
this.validateUnknownReplicas = false;
}

public Uri ServiceEndpoint => this.serviceEndpoint;
Expand Down Expand Up @@ -120,6 +122,14 @@ public async Task OpenConnectionsAsync(
List<Task> tasks = new ();
int batchSize = GatewayAddressCache.DefaultBatchSize;

// By design, the Unknown replicas are validated only when the following two conditions meet:
// 1) The CosmosClient is initiated using the CreateAndInitializaAsync() flow.
// 2) The advanced replica selection feature enabled.
if (shouldOpenRntbdChannels)
{
this.validateUnknownReplicas = true;
}

#if !(NETSTANDARD15 || NETSTANDARD16)
#if NETSTANDARD20
// GetEntryAssembly returns null when loaded from native netstandard2.0
Expand Down Expand Up @@ -302,27 +312,24 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
.ReplicaTransportAddressUris
.Any(x => x.ShouldRefreshHealthStatus()))
{
Task refreshAddressesInBackgroundTask = Task.Run(async () =>
try
{
try
{
await this.serverPartitionAddressCache.RefreshAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
this.serverPartitionAddressCache.Refresh(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true));
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to refresh addresses in the background for the collection rid: {0} with exception: {1}. '{2}'",
partitionKeyRangeIdentity.CollectionRid,
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
});
}
catch (Exception ex)
{
DefaultTrace.TraceWarning("Failed to refresh addresses in the background for the collection rid: {0} with exception: {1}. '{2}'",
partitionKeyRangeIdentity.CollectionRid,
ex,
System.Diagnostics.Trace.CorrelationManager.ActivityId);
}
}

return addresses;
Expand Down Expand Up @@ -1008,18 +1015,26 @@ private static PartitionAddressInformation MergeAddresses(
/// Returns a list of <see cref="TransportAddressUri"/> needed to validate their health status. Validating
/// a uri is done by opening Rntbd connection to the backend replica, which is a costly operation by nature. Therefore
/// vaidating both Unhealthy and Unknown replicas at the same time could impose a high CPU utilization. To avoid this
/// situation, the RntbdOpenConnectionHandler has good concurrency control mechanism to open the connections gracefully/>.
/// situation, the RntbdOpenConnectionHandler has good concurrency control mechanism to open the connections gracefully.
/// By default, this method only returns the Unhealthy replicas that requires to validate it's connectivity status. The
/// Unknown replicas are validated only when the CosmosClient is initiated using the CreateAndInitializaAsync() flow.
/// </summary>
/// <param name="transportAddresses">A read only list of <see cref="TransportAddressUri"/>s.</param>
/// <returns>A list of <see cref="TransportAddressUri"/> that needs to validate their status.</returns>
private IEnumerable<TransportAddressUri> GetAddressesNeededToValidateStatus(
IReadOnlyList<TransportAddressUri> transportAddresses)
{
return transportAddresses
.Where(address => address
return this.validateUnknownReplicas
? transportAddresses
.Where(address => address
.GetCurrentHealthState()
.GetHealthStatus() is
TransportAddressHealthState.HealthStatus.UnhealthyPending or
TransportAddressHealthState.HealthStatus.Unknown)
: transportAddresses
.Where(address => address
.GetCurrentHealthState()
.GetHealthStatus() is
TransportAddressHealthState.HealthStatus.Unknown or
TransportAddressHealthState.HealthStatus.UnhealthyPending);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public async Task TestGoneFromServiceScenarioAsync(
"44444444444444444",
};

HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
replicaIds1,
partitionKeyRanges.First(),
"eastus",
cRid);
HttpResponseMessage replicaSet1 = MockSetupsHelper.CreateAddresses(
replicaIds1,
partitionKeyRanges.First(),
"eastus",
cRid);

// One replica changed on the refresh
List<string> replicaIds2 = new List<string>()
Expand Down Expand Up @@ -176,6 +176,10 @@ public async Task TestGoneFromServiceScenarioAsync(
mockTransportClient.VerifyAll();
mockHttpHandler.VerifyAll();

mockTransportClient
.Setup(x => x.OpenConnectionAsync(It.IsAny<Uri>()))
.Returns(Task.CompletedTask);

Documents.TransportAddressUri failedReplica = urisVisited.First();

// With replica validation enabled in preview mode, the failed replica will be validated as a part of the flow,
Expand Down
Loading