Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
OpenTelemetry.Metrics.Metric.MeterSchemaUrl.get -> string!
OpenTelemetry.Metrics.MetricReaderTemporalityPreference.LowMemory = 3 -> OpenTelemetry.Metrics.MetricReaderTemporalityPreference
OpenTelemetry.BatchActivityExportProcessor.BatchActivityExportProcessor(OpenTelemetry.BaseExporter<System.Diagnostics.Activity!>! exporter, OpenTelemetry.BatchExportProcessorOptions<System.Diagnostics.Activity!>! options) -> void
OpenTelemetry.BatchExportProcessor<T>.BatchExportProcessor(OpenTelemetry.BaseExporter<T!>! exporter, OpenTelemetry.BatchExportProcessorOptions<T!>! options) -> void
OpenTelemetry.BatchLogRecordExportProcessor.BatchLogRecordExportProcessor(OpenTelemetry.BaseExporter<OpenTelemetry.Logs.LogRecord!>! exporter, OpenTelemetry.BatchExportProcessorOptions<OpenTelemetry.Logs.LogRecord!>! options) -> void
OpenTelemetry.Metrics.PeriodicExportingMetricReader.PeriodicExportingMetricReader(OpenTelemetry.BaseExporter<OpenTelemetry.Metrics.Metric!>! exporter, OpenTelemetry.Metrics.PeriodicExportingMetricReaderOptions! options) -> void
225 changes: 60 additions & 165 deletions src/OpenTelemetry/BatchExportProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using System.Runtime.CompilerServices;
using OpenTelemetry.Internal;

Expand All @@ -24,12 +23,7 @@ public abstract class BatchExportProcessor<T> : BaseExportProcessor<T>
internal readonly int ExporterTimeoutMilliseconds;

private readonly CircularBuffer<T> circularBuffer;
private readonly Thread exporterThread;
private readonly AutoResetEvent exportTrigger = new(false);
private readonly ManualResetEvent dataExportedNotification = new(false);
private readonly ManualResetEvent shutdownTrigger = new(false);
private long shutdownDrainTarget = long.MaxValue;
private long droppedCount;
private readonly BatchExportWorker<T> worker;
private bool disposed;

/// <summary>
Expand All @@ -46,31 +40,48 @@ protected BatchExportProcessor(
int scheduledDelayMilliseconds = DefaultScheduledDelayMilliseconds,
int exporterTimeoutMilliseconds = DefaultExporterTimeoutMilliseconds,
int maxExportBatchSize = DefaultMaxExportBatchSize)
: this(exporter, new BatchExportProcessorOptions<T>
{
MaxQueueSize = maxQueueSize,
ScheduledDelayMilliseconds = scheduledDelayMilliseconds,
ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds,
MaxExportBatchSize = maxExportBatchSize,
})
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BatchExportProcessor{T}"/> class.
/// </summary>
/// <param name="exporter">Exporter instance.</param>
/// <param name="options">Configuration options for the batch export processor.</param>
protected BatchExportProcessor(
BaseExporter<T> exporter,
BatchExportProcessorOptions<T> options)
: base(exporter)
{
Guard.ThrowIfNull(options);

var maxQueueSize = options?.MaxQueueSize ?? 0;
Guard.ThrowIfOutOfRange(maxQueueSize, min: 1);
Guard.ThrowIfOutOfRange(maxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(maxQueueSize));
Guard.ThrowIfOutOfRange(scheduledDelayMilliseconds, min: 1);
Guard.ThrowIfOutOfRange(exporterTimeoutMilliseconds, min: 0);

this.circularBuffer = new CircularBuffer<T>(maxQueueSize);
this.ScheduledDelayMilliseconds = scheduledDelayMilliseconds;
this.ExporterTimeoutMilliseconds = exporterTimeoutMilliseconds;
this.MaxExportBatchSize = maxExportBatchSize;
this.exporterThread = new Thread(this.ExporterProc)
{
IsBackground = true,
#pragma warning disable CA1062 // Validate arguments of public methods - needed for netstandard2.1
Name = $"OpenTelemetry-{nameof(BatchExportProcessor<T>)}-{exporter.GetType().Name}",
#pragma warning restore CA1062 // Validate arguments of public methods - needed for netstandard2.1
};
this.exporterThread.Start();
this.ScheduledDelayMilliseconds = options?.ScheduledDelayMilliseconds ?? 0;
this.ExporterTimeoutMilliseconds = options?.ExporterTimeoutMilliseconds ?? -1;
this.MaxExportBatchSize = options?.MaxExportBatchSize ?? 0;

Guard.ThrowIfOutOfRange(this.MaxExportBatchSize, min: 1, max: maxQueueSize, maxName: nameof(options.MaxQueueSize));
Guard.ThrowIfOutOfRange(this.ScheduledDelayMilliseconds, min: 1);
Guard.ThrowIfOutOfRange(this.ExporterTimeoutMilliseconds, min: 0);

this.worker = this.CreateWorker();
this.worker.Start();
}

/// <summary>
/// Gets the number of telemetry objects dropped by the processor.
/// </summary>
internal long DroppedCount => Volatile.Read(ref this.droppedCount);
internal long DroppedCount => this.worker.DroppedCount;

/// <summary>
/// Gets the number of telemetry objects received by the processor.
Expand All @@ -89,20 +100,14 @@ internal bool TryExport(T data)
{
if (this.circularBuffer.Count >= this.MaxExportBatchSize)
{
try
{
this.exportTrigger.Set();
}
catch (ObjectDisposedException)
{
}
this.worker.TriggerExport();
}

return true; // enqueue succeeded
}

// either the queue is full or exceeded the spin limit, drop the item on the floor
Interlocked.Increment(ref this.droppedCount);
this.worker.IncrementDroppedCount();

return false;
}
Expand All @@ -116,113 +121,27 @@ protected override void OnExport(T data)
/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
var tail = this.circularBuffer.RemovedCount;
var head = this.circularBuffer.AddedCount;

if (head == tail)
{
return true; // nothing to flush
}

try
{
this.exportTrigger.Set();
}
catch (ObjectDisposedException)
{
return false;
}

if (timeoutMilliseconds == 0)
{
return false;
}

var triggers = new WaitHandle[] { this.dataExportedNotification, this.shutdownTrigger };

var sw = timeoutMilliseconds == Timeout.Infinite
? null
: Stopwatch.StartNew();

// There is a chance that the export thread finished processing all the data from the queue,
// and signaled before we enter wait here, use polling to prevent being blocked indefinitely.
const int pollingMilliseconds = 1000;

while (true)
{
if (sw == null)
{
try
{
WaitHandle.WaitAny(triggers, pollingMilliseconds);
}
catch (ObjectDisposedException)
{
return false;
}
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return this.circularBuffer.RemovedCount >= head;
}

try
{
WaitHandle.WaitAny(triggers, Math.Min((int)timeout, pollingMilliseconds));
}
catch (ObjectDisposedException)
{
return false;
}
}

if (this.circularBuffer.RemovedCount >= head)
{
return true;
}

if (Volatile.Read(ref this.shutdownDrainTarget) != long.MaxValue)
{
return false;
}
}
return this.worker.WaitForExport(timeoutMilliseconds);
}

/// <inheritdoc/>
protected override bool OnShutdown(int timeoutMilliseconds)
{
Volatile.Write(ref this.shutdownDrainTarget, this.circularBuffer.AddedCount);

try
{
this.shutdownTrigger.Set();
}
catch (ObjectDisposedException)
{
return false;
}
var result = this.worker.Shutdown(timeoutMilliseconds);

OpenTelemetrySdkEventSource.Log.DroppedExportProcessorItems(this.GetType().Name, this.exporter.GetType().Name, this.DroppedCount);

if (timeoutMilliseconds == Timeout.Infinite)
{
this.exporterThread.Join();
return this.exporter.Shutdown();
return this.exporter.Shutdown() && result;
}

if (timeoutMilliseconds == 0)
{
return this.exporter.Shutdown(0);
return this.exporter.Shutdown(0) && result;
}

var sw = Stopwatch.StartNew();
this.exporterThread.Join(timeoutMilliseconds);
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
return this.exporter.Shutdown((int)Math.Max(timeout, 0));
return this.exporter.Shutdown(timeoutMilliseconds) && result;
}

/// <inheritdoc/>
Expand All @@ -232,9 +151,7 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
this.exportTrigger.Dispose();
this.dataExportedNotification.Dispose();
this.shutdownTrigger.Dispose();
this.worker?.Dispose();
}

this.disposed = true;
Expand All @@ -243,49 +160,27 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

private void ExporterProc()
private BatchExportWorker<T> CreateWorker()
{
var triggers = new WaitHandle[] { this.exportTrigger, this.shutdownTrigger };

while (true)
#if NET
// Use task-based worker for browser platform where threading may be limited
if (ThreadingHelper.IsThreadingDisabled())
{
// only wait when the queue doesn't have enough items, otherwise keep busy and send data continuously
if (this.circularBuffer.Count < this.MaxExportBatchSize)
{
try
{
WaitHandle.WaitAny(triggers, this.ScheduledDelayMilliseconds);
}
catch (ObjectDisposedException)
{
// the exporter is somehow disposed before the worker thread could finish its job
return;
}
}

if (this.circularBuffer.Count > 0)
{
using (var batch = new Batch<T>(this.circularBuffer, this.MaxExportBatchSize))
{
this.exporter.Export(batch);
}

try
{
this.dataExportedNotification.Set();
this.dataExportedNotification.Reset();
}
catch (ObjectDisposedException)
{
// the exporter is somehow disposed before the worker thread could finish its job
return;
}
}

if (this.circularBuffer.RemovedCount >= Volatile.Read(ref this.shutdownDrainTarget))
{
return;
}
return new BatchExportTaskWorker<T>(
this.circularBuffer,
this.exporter,
this.MaxExportBatchSize,
this.ScheduledDelayMilliseconds,
this.ExporterTimeoutMilliseconds);
}
#endif

// Use thread-based worker for all other platforms
return new BatchExportThreadWorker<T>(
this.circularBuffer,
this.exporter,
this.MaxExportBatchSize,
this.ScheduledDelayMilliseconds,
this.ExporterTimeoutMilliseconds);
}
}
Loading