Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,12 @@ internal EventProcessorHost GetEventProcessorHost(string eventHubName, string co
consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName;

// Use blob prefix support available in EPH starting in 2.2.6
EventProcessorHost host = new EventProcessorHost(
EventProcessorHost host = new EventProcessorHost(consumerGroup: consumerGroup,
connectionString: creds.EventHubConnectionString,
eventHubName: eventHubName,
consumerGroupName: consumerGroup,
eventHubConnectionString: creds.EventHubConnectionString,
exceptionHandler: _exceptionHandler);
options: this.EventProcessorOptions,
eventBatchMaximumCount: _maxBatchSize,
invokeProcessorAfterReceiveTimeout: InvokeProcessorAfterReceiveTimeout, exceptionHandler: _exceptionHandler);

return host;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca
private readonly BlobsCheckpointStore _checkpointStore;
private readonly EventHubOptions _options;
private readonly ILogger _logger;
private bool _started;

private Lazy<EventHubsScaleMonitor> _scaleMonitor;

Expand Down Expand Up @@ -70,17 +69,12 @@ void IDisposable.Dispose()

public async Task StartAsync(CancellationToken cancellationToken)
{
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.MaxBatchSize, _options.InvokeProcessorAfterReceiveTimeout, _checkpointStore, _options.EventProcessorOptions).ConfigureAwait(false);
_started = true;
await _eventProcessorHost.StartProcessingAsync(this, _checkpointStore, cancellationToken).ConfigureAwait(false);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
if (_started)
{
await _eventProcessorHost.UnregisterEventProcessorAsync().ConfigureAwait(false);
}
_started = false;
await _eventProcessorHost.StopProcessingAsync(cancellationToken).ConfigureAwait(false);
}

IEventProcessor IEventProcessorFactory.CreateEventProcessor()
Expand All @@ -93,37 +87,27 @@ public IScaleMonitor GetMonitor()
return _scaleMonitor.Value;
}

/// <summary>
/// Wrapper for un-mockable checkpoint APIs to aid in unit testing
/// </summary>
internal interface ICheckpointer
{
Task CheckpointAsync(ProcessorPartitionContext context);
}

// We get a new instance each time Start() is called.
// We'll get a listener per partition - so they can potentialy run in parallel even on a single machine.
internal class EventProcessor : IEventProcessor, IDisposable, ICheckpointer
internal class EventProcessor : IEventProcessor, IDisposable
{
private readonly ITriggeredFunctionExecutor _executor;
private readonly bool _singleDispatch;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly ICheckpointer _checkpointer;
private readonly int _batchCheckpointFrequency;
private int _batchCounter;
private bool _disposed;

public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch, ICheckpointer checkpointer = null)
public EventProcessor(EventHubOptions options, ITriggeredFunctionExecutor executor, ILogger logger, bool singleDispatch)
{
_checkpointer = checkpointer ?? this;
_executor = executor;
_singleDispatch = singleDispatch;
_batchCheckpointFrequency = options.BatchCheckpointFrequency;
_logger = logger;
}

public Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReason reason)
public Task CloseAsync(EventProcessorHostPartition context, ProcessingStoppedReason reason)
{
// signal cancellation for any in progress executions
_cts.Cancel();
Expand All @@ -132,13 +116,13 @@ public Task CloseAsync(ProcessorPartitionContext context, ProcessingStoppedReaso
return Task.CompletedTask;
}

public Task OpenAsync(ProcessorPartitionContext context)
public Task OpenAsync(EventProcessorHostPartition context)
{
_logger.LogDebug(GetOperationDetails(context, "OpenAsync"));
return Task.CompletedTask;
}

public Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error)
public Task ProcessErrorAsync(EventProcessorHostPartition context, Exception error)
{
string errorDetails = $"Processing error (Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}').";

Expand All @@ -147,7 +131,7 @@ public Task ProcessErrorAsync(ProcessorPartitionContext context, Exception error
return Task.CompletedTask;
}

public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumerable<EventData> messages)
public async Task ProcessEventsAsync(EventProcessorHostPartition context, IEnumerable<EventData> messages)
{
var triggerInput = new EventHubTriggerInput
{
Expand Down Expand Up @@ -209,8 +193,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera
// code, and capture/log/persist failed events, since they won't be retried.
if (messages.Any())
{
context.CheckpointEvent = messages.Last();
await CheckpointAsync(context).ConfigureAwait(false);
await CheckpointAsync(messages.Last(), context).ConfigureAwait(false);
}
}

Expand All @@ -222,12 +205,12 @@ private async Task TryExecuteWithLoggingAsync(TriggeredFunctionData input, Event
}
}

private async Task CheckpointAsync(ProcessorPartitionContext context)
private async Task CheckpointAsync(EventData checkpointEvent, EventProcessorHostPartition context)
{
bool checkpointed = false;
if (_batchCheckpointFrequency == 1)
{
await _checkpointer.CheckpointAsync(context).ConfigureAwait(false);
await context.CheckpointAsync(checkpointEvent).ConfigureAwait(false);
checkpointed = true;
}
else
Expand All @@ -236,7 +219,7 @@ private async Task CheckpointAsync(ProcessorPartitionContext context)
if (++_batchCounter >= _batchCheckpointFrequency)
{
_batchCounter = 0;
await _checkpointer.CheckpointAsync(context).ConfigureAwait(false);
await context.CheckpointAsync(checkpointEvent).ConfigureAwait(false);
checkpointed = true;
}
}
Expand Down Expand Up @@ -264,11 +247,6 @@ public void Dispose()
Dispose(true);
}

async Task ICheckpointer.CheckpointAsync(ProcessorPartitionContext context)
{
await context.CheckpointAsync().ConfigureAwait(false);
}

private static Dictionary<string, object> GetLinksScope(EventData message)
{
if (TryGetLinkedActivity(message, out var link))
Expand Down Expand Up @@ -319,7 +297,7 @@ private static bool TryGetLinkedActivity(EventData message, out Activity link)
return false;
}

private static string GetOperationDetails(ProcessorPartitionContext context, string operation)
private static string GetOperationDetails(EventProcessorHostPartition context, string operation)
{
StringWriter sw = new StringWriter();
using (JsonTextWriter writer = new JsonTextWriter(sw) { Formatting = Formatting.None })
Expand Down
Loading