Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static OpenTelemetryBuilder UseConsoleActivityExporter(this OpenTelemetry
var exporterOptions = new ConsoleActivityExporterOptions();
configure(exporterOptions);
var consoleExporter = new ConsoleActivityExporter(exporterOptions);
return builder.SetProcessorPipeline(pipeline => pipeline.SetExporter(consoleExporter));
return builder.AddProcessorPipeline(pipeline => pipeline.SetExporter(consoleExporter));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static OpenTelemetryBuilder UseJaegerActivityExporter(this OpenTelemetryB
throw new ArgumentNullException(nameof(configure));
}

return builder.SetProcessorPipeline(pipeline =>
return builder.AddProcessorPipeline(pipeline =>
{
var exporterOptions = new JaegerExporterOptions();
configure(exporterOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="TracerBuilderExtensions.cs" company="OpenTelemetry Authors">
// <copyright file="TracerBuilderExtensions.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -102,7 +102,7 @@ public static OpenTelemetryBuilder UseOpenTelemetryProtocolActivityExporter(this
throw new ArgumentNullException(nameof(configure));
}

return builder.SetProcessorPipeline(pipeline =>
return builder.AddProcessorPipeline(pipeline =>
{
var exporterOptions = new ExporterOptions();
configure(exporterOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ internal ActivityProcessor Build()
}
else if (this.Exporter != null)
{
// TODO: Make this BatchingActivityProcessor once its available.
exportingProcessor = new SimpleActivityProcessor(this.Exporter);
this.Processors.Add(exportingProcessor);
}
Expand Down
11 changes: 8 additions & 3 deletions src/OpenTelemetry/Trace/Configuration/OpenTelemetryBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal OpenTelemetryBuilder()
{
}

internal ActivityProcessorPipelineBuilder ProcessingPipeline { get; private set; }
internal List<ActivityProcessorPipelineBuilder> ProcessingPipelines { get; private set; }

internal List<InstrumentationFactory> InstrumentationFactories { get; private set; }

Expand All @@ -42,16 +42,21 @@ internal OpenTelemetryBuilder()
/// </summary>
/// <param name="configure">Function that configures pipeline.</param>
/// <returns>Returns <see cref="OpenTelemetryBuilder"/> for chaining.</returns>
public OpenTelemetryBuilder SetProcessorPipeline(Action<ActivityProcessorPipelineBuilder> configure)
public OpenTelemetryBuilder AddProcessorPipeline(Action<ActivityProcessorPipelineBuilder> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

if (this.ProcessingPipelines == null)
{
this.ProcessingPipelines = new List<ActivityProcessorPipelineBuilder>();
}

var pipelineBuilder = new ActivityProcessorPipelineBuilder();
configure(pipelineBuilder);
this.ProcessingPipeline = pipelineBuilder;
this.ProcessingPipelines.Add(pipelineBuilder);
return this;
}

Expand Down
21 changes: 19 additions & 2 deletions src/OpenTelemetry/Trace/Configuration/OpenTelemetrySdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using OpenTelemetry.Trace.Export;
using OpenTelemetry.Trace.Export.Internal;
using OpenTelemetry.Trace.Samplers;

namespace OpenTelemetry.Trace.Configuration
Expand Down Expand Up @@ -54,14 +56,29 @@ public static OpenTelemetrySdk EnableOpenTelemetry(Action<OpenTelemetryBuilder>
ActivitySampler sampler = openTelemetryBuilder.Sampler ?? new AlwaysOnActivitySampler();

ActivityProcessor activityProcessor;
if (openTelemetryBuilder.ProcessingPipeline == null)
if (openTelemetryBuilder.ProcessingPipelines == null || !openTelemetryBuilder.ProcessingPipelines.Any())
{
// if there are no pipelines are configured, use noop processor
activityProcessor = new NoopActivityProcessor();
}
else if (openTelemetryBuilder.ProcessingPipelines.Count == 1)
{
// if there is only one pipeline - use it's outer processor as a
// single processor on the tracerSdk.
var processorFactory = openTelemetryBuilder.ProcessingPipelines[0];
activityProcessor = processorFactory.Build();
}
else
{
activityProcessor = openTelemetryBuilder.ProcessingPipeline.Build();
// if there are more pipelines, use processor that will broadcast to all pipelines
var processors = new ActivityProcessor[openTelemetryBuilder.ProcessingPipelines.Count];

for (int i = 0; i < openTelemetryBuilder.ProcessingPipelines.Count; i++)
{
processors[i] = openTelemetryBuilder.ProcessingPipelines[i].Build();
}

activityProcessor = new BroadcastActivityProcessor(processors);
}

var activitySource = new ActivitySourceAdapter(sampler, activityProcessor);
Expand Down
105 changes: 105 additions & 0 deletions src/OpenTelemetry/Trace/Export/Internal/BroadcastActivityProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// <copyright file="BroadcastActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Trace.Export.Internal
{
internal class BroadcastActivityProcessor : ActivityProcessor, IDisposable
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is misleading since "broadcast" normally means there is no need to specify the target.

Probably borrow from Java/Python:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FanOutActivityProcessor? 🤷

{
private readonly IEnumerable<ActivityProcessor> processors;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential perf concern:
IEnumerable<T> implies a new enumerator object creation on the hot path (each span will result in a foreach (var processor in this.processors).
Probably worth considering array or list (which maintains the count and can be accessed via index).


public BroadcastActivityProcessor(IEnumerable<ActivityProcessor> processors)
{
if (processors == null)
{
throw new ArgumentNullException(nameof(processors));
}

if (!processors.Any())
{
throw new ArgumentException($"{nameof(processors)} collection is empty");
}

this.processors = processors;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a shallow copy? What if the passed in processors got updated later?

}

public override void OnEnd(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnEnd(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnEnd", e);
}
}
}

public override void OnStart(Activity activity)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Probably more common to have Start before End.

{
foreach (var processor in this.processors)
{
try
{
processor.OnStart(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnStart", e);
}
}
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var processor in this.processors)
{
tasks.Add(processor.ShutdownAsync(cancellationToken));
}

return Task.WhenAll(tasks);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancellationToken is passed to all ShutdownAsync but if one misbehaves this line can wait forever. Is this relying on cancellation by the caller?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code is vulnerable to one misbehaving processor. Not just in shutdown but even in start/stop as well. We need to address it properly.

}

public void Dispose()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO because it's internal it's fine as-is. That being said, OpenTelemetrySdk isn't calling shutdown or dispose on the ActivityProcessor it is maintaining. That's a problem. We need to flush out spans & perform cleanup on app shutdown right? I think OpenTelemetrySdk.Dispose should call ActivityProcessor.Dispose which should invoke ActivityProcessor.ShutdownAsync? We might want to look at using IAsyncDisposable for that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few things we need to address:

  1. Improvements to SimpleSpanProcessor. #674 SimpleSpanProcessor needs to be a synchronous call (e.g. when the control is returned to the caller, the span got processed - current it is not guaranteed).
  2. The life-cycle management of processors/exporters, related to this comment. For example, if we have the same processor/exporter registered twice, would we flush the exporter on the first OnEnd, or maintain a refcount and call OnEnd during Dispose? Does OnEnd need to be ref-counted as well?
  3. We need to have a deterministic flush semantic. By default I would imagine we want to flush traces/logs before app shutdown, metrics is something debatable. And we need to give option to the user whether they want to wait indefinitely or have a timeout or no wait at all.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with all of the above - But my goal with this PR is to just mimic the existing behavior with Spans (this pr was made with pure 'copy paste & minor adjustments' ). I'd prefer to get the parity 1st, then fix underlying issues.

Let me know if this approach is okay?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, these are not blockers for this PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO because it's internal it's fine as-is. That being said, OpenTelemetrySdk isn't calling shutdown or dispose on the ActivityProcessor it is maintaining. That's a problem. We need to flush out spans & perform cleanup on app shutdown right? I think OpenTelemetrySdk.Dispose should call ActivityProcessor.Dispose which should invoke ActivityProcessor.ShutdownAsync? We might want to look at using IAsyncDisposable for that.

Yes this is needed. Modified OTSDK to call dispose on the Activityprocessor it has.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up on why Span was not calling Shutdown in SimpleProcessor! (Was doing it right for batching one). Anyway - both will be fixed in next PR.

{
foreach (var processor in this.processors)
{
try
{
if (processor is IDisposable disposable)
{
disposable.Dispose();
}
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void AspNetRequestsAreCollectedSuccessfully(
options.TextFormat = textFormat.Object;
}
})
.SetProcessorPipeline(p => p.AddProcessor(_ => activityProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(_ => activityProcessor.Object))))
{
activity.Start();
this.fakeAspNetDiagnosticSource.Write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public async Task SuccessfulTemplateControllerCallGeneratesASpan()
void ConfigureTestServices(IServiceCollection services)
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}

// Arrange
Expand Down Expand Up @@ -100,7 +100,7 @@ public async Task SuccessfulTemplateControllerCallUsesParentContext()
builder.ConfigureTestServices(services =>
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
})))
{
using var client = testFactory.CreateClient();
Expand Down Expand Up @@ -148,7 +148,7 @@ public async Task CustomTextFormat()
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddRequestInstrumentation((opt) => opt.TextFormat = textFormat.Object)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
})))
{
using var client = testFactory.CreateClient();
Expand Down Expand Up @@ -181,7 +181,7 @@ void ConfigureTestServices(IServiceCollection services)
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(
(builder) =>
builder.AddRequestInstrumentation((opt) => opt.RequestFilter = (ctx) => ctx.Request.Path != "/api/values/2")
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}

// Arrange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async Task SuccessfulTemplateControllerCallGeneratesASpan()
{
services.AddSingleton<CallbackMiddleware.CallbackMiddlewareImpl>(new TestCallbackMiddlewareImpl());
services.AddOpenTelemetrySdk((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}))
.CreateClient())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync()

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
Expand Down Expand Up @@ -124,7 +124,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync_CustomForma

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation((opt) => opt.TextFormat = textFormat.Object)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
Expand Down Expand Up @@ -154,7 +154,7 @@ public async Task HttpDependenciesInstrumentation_AddViaFactory_HttpInstrumentat

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
Expand All @@ -172,7 +172,7 @@ public async Task HttpDependenciesInstrumentation_AddViaFactory_DependencyInstru

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
Expand All @@ -198,7 +198,7 @@ public async Task HttpDependenciesInstrumentationBacksOffIfAlreadyInstrumented()

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
Expand All @@ -218,7 +218,7 @@ public async void HttpDependenciesInstrumentationFiltersOutRequests()
(opt) => opt.EventFilter = (activityName, arg1, _) => !(activityName == "System.Net.Http.HttpRequestOut" &&
arg1 is HttpRequestMessage request &&
request.RequestUri.OriginalString.Contains(this.url)))
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
Expand All @@ -234,7 +234,7 @@ public async Task HttpDependenciesInstrumentationFiltersOutRequestsToExporterEnd

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task HttpOutCallsAreCollectedSuccessfullyAsync(HttpTestData.HttpOut

using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation((opt) => opt.SetHttpFlavor = tc.SetHttpFlavor)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync()
var activityProcessor = new Mock<ActivityProcessor>();
using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b =>
{
b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddHttpWebRequestDependencyInstrumentation();
});

Expand Down Expand Up @@ -113,7 +113,7 @@ public async Task HttpDependenciesInstrumentationInjectsHeadersAsync_CustomForma
var activityProcessor = new Mock<ActivityProcessor>();
using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b =>
{
b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddHttpWebRequestDependencyInstrumentation();
});

Expand Down Expand Up @@ -153,7 +153,7 @@ public async Task HttpDependenciesInstrumentationBacksOffIfAlreadyInstrumented()
var activityProcessor = new Mock<ActivityProcessor>();
using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b =>
{
b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddHttpWebRequestDependencyInstrumentation();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void HttpOutCallsAreCollectedSuccessfullyAsync(HttpTestData.HttpOutTestCa
var activityProcessor = new Mock<ActivityProcessor>();
using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b =>
{
b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddHttpWebRequestDependencyInstrumentation();
});

Expand Down
Loading