Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default)
{
return this.container.ClientContext.OperationHelperAsync(
nameof(ExecuteAsync),
requestOptions,
(trace) =>
operationName: nameof(ExecuteAsync),
containerName: this.container.Id,
databaseName: this.container.Database.Id,
operationType: Documents.OperationType.Replace,
requestOptions: requestOptions,
task: (trace) =>
{
BatchExecutor executor = new BatchExecutor(
container: this.container,
Expand All @@ -232,10 +235,8 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(trace, cancellationToken);
},
(response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id));
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

internal sealed class ChangeFeedIteratorCore : FeedIteratorInternal
{
Expand Down Expand Up @@ -222,12 +222,12 @@ public ChangeFeedIteratorCore(
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return await this.clientContext.OperationHelperAsync("Change Feed Iterator Read Next Async",
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id ?? this.databaseName,
operationType: OperationType.ReadFeed,
requestOptions: this.changeFeedRequestOptions,
task: (trace) => this.ReadNextInternalAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
openTelemetry: (response) => new OpenTelemetryResponse(responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ public override CosmosElement GetCosmosElementContinuationToken()
/// <returns>A change feed response from cosmos service</returns>
public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return this.clientContext.OperationHelperAsync("Change Feed Processor Read Next Async",
return this.clientContext.OperationHelperAsync(
operationName: "Change Feed Processor Read Next Async",
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id ?? this.databaseName,
operationType: Documents.OperationType.ReadFeed,
requestOptions: this.changeFeedOptions,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ private ChangeFeedEstimatorIterator(

public override Task<FeedResponse<ChangeFeedProcessorState>> ReadNextAsync(CancellationToken cancellationToken = default)
{
return this.monitoredContainer.ClientContext.OperationHelperAsync("Change Feed Estimator Read Next Async",
return this.monitoredContainer.ClientContext.OperationHelperAsync(
operationName: "Change Feed Estimator Read Next Async",
containerName: this.monitoredContainer?.Id,
databaseName: this.monitoredContainer?.Database?.Id,
operationType: Documents.OperationType.ReadFeed,
requestOptions: null,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(
responseMessage: response,
containerName: this.monitoredContainer?.Id,
databaseName: this.monitoredContainer?.Database?.Id ?? this.databaseName),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
164 changes: 86 additions & 78 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -629,9 +627,12 @@ internal CosmosClient(
public virtual Task<AccountProperties> ReadAccountAsync()
{
return this.ClientContext.OperationHelperAsync(
nameof(ReadAccountAsync),
null,
(trace) => ((IDocumentClientInternal)this.DocumentClient).GetDatabaseAccountInternalAsync(this.Endpoint));
operationName: nameof(ReadAccountAsync),
containerName: null,
databaseName: null,
operationType: OperationType.Read,
requestOptions: null,
task: (trace) => ((IDocumentClientInternal)this.DocumentClient).GetDatabaseAccountInternalAsync(this.Endpoint));
}

/// <summary>
Expand Down Expand Up @@ -715,9 +716,12 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
ThroughputProperties throughputProperties = ThroughputProperties.CreateManualThroughput(throughput);
Expand All @@ -729,10 +733,7 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
trace: trace,
cancellationToken: cancellationToken);
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -765,9 +766,12 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return this.CreateDatabaseInternalAsync(
Expand All @@ -777,10 +781,7 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
trace: trace,
cancellationToken: cancellationToken);
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -824,59 +825,60 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
return string.IsNullOrEmpty(id)
? throw new ArgumentNullException(nameof(id))
: this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
async (trace) =>
{
double totalRequestCharge = 0;
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
DatabaseCore database = (DatabaseCore)this.GetDatabase(id);
using (ResponseMessage readResponse = await database.ReadStreamAsync(
operationName: nameof(CreateDatabaseIfNotExistsAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge = readResponse.Headers.RequestCharge;
if (readResponse.StatusCode != HttpStatusCode.NotFound)
task: async (trace) =>
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
}
}

using (ResponseMessage createResponse = await this.CreateDatabaseStreamInternalAsync(
databaseProperties,
throughputProperties,
requestOptions,
trace,
cancellationToken))
{
totalRequestCharge += createResponse.Headers.RequestCharge;
createResponse.Headers.RequestCharge = totalRequestCharge;

if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
}
}

// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
using (ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge += readResponseAfterConflict.Headers.RequestCharge;
readResponseAfterConflict.Headers.RequestCharge = totalRequestCharge;

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
}
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
double totalRequestCharge = 0;
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
DatabaseCore database = (DatabaseCore)this.GetDatabase(id);
using (ResponseMessage readResponse = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge = readResponse.Headers.RequestCharge;
if (readResponse.StatusCode != HttpStatusCode.NotFound)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
}
}

using (ResponseMessage createResponse = await this.CreateDatabaseStreamInternalAsync(
databaseProperties,
throughputProperties,
requestOptions,
trace,
cancellationToken))
{
totalRequestCharge += createResponse.Headers.RequestCharge;
createResponse.Headers.RequestCharge = totalRequestCharge;

if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
}
}

// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
using (ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge += readResponseAfterConflict.Headers.RequestCharge;
readResponseAfterConflict.Headers.RequestCharge = totalRequestCharge;

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
}
},
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -1165,9 +1167,12 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseStreamAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseStreamAsync),
containerName: null,
databaseName: databaseProperties.Id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
Expand All @@ -1177,7 +1182,7 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
trace,
cancellationToken);
},
(response) => new OpenTelemetryResponse(response));
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

/// <summary>
Expand Down Expand Up @@ -1260,9 +1265,12 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseIfNotExistsAsync),
containerName: null,
databaseName: databaseProperties.Id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
Expand All @@ -1272,7 +1280,7 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
trace,
cancellationToken);
},
(response) => new OpenTelemetryResponse(response));
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

private async Task<DatabaseResponse> CreateDatabaseInternalAsync(
Expand Down
Loading