diff --git a/OptimizelySDK.Tests/OdpTests/OdpEventManagerTests.cs b/OptimizelySDK.Tests/OdpTests/OdpEventManagerTests.cs index 25c9be82..87ce608d 100644 --- a/OptimizelySDK.Tests/OdpTests/OdpEventManagerTests.cs +++ b/OptimizelySDK.Tests/OdpTests/OdpEventManagerTests.cs @@ -311,7 +311,7 @@ public void ShouldAddAdditionalInformationToEachEvent() } [Test] - public void ShouldAttemptToFlushAnEmptyQueueAtFlushInterval() + public void ShouldNotAttemptToFlushAnEmptyQueueAtFlushInterval() { var eventManager = new OdpEventManager.Builder(). WithOdpEventApiManager(_mockApiManager.Object). @@ -327,7 +327,7 @@ public void ShouldAttemptToFlushAnEmptyQueueAtFlushInterval() eventManager.Stop(); _mockLogger.Verify(l => l.Log(LogLevel.DEBUG, "Flushing queue."), - Times.AtLeast(3)); + Times.Never); } [Test] diff --git a/OptimizelySDK/Odp/OdpEventManager.cs b/OptimizelySDK/Odp/OdpEventManager.cs index 13eef794..3039e7f2 100644 --- a/OptimizelySDK/Odp/OdpEventManager.cs +++ b/OptimizelySDK/Odp/OdpEventManager.cs @@ -146,35 +146,45 @@ protected virtual void Run() { while (true) { - if (DateTime.Now.MillisecondsSince1970() > _flushingIntervalDeadline) + object item; + // If batch has events, set the timeout to remaining time for flush interval, + // otherwise wait for the new event indefinitely + if (_currentBatch.Count > 0) { - _logger.Log(LogLevel.DEBUG, $"Flushing queue."); - FlushQueue(); + _eventQueue.TryTake(out item, (int)(_flushingIntervalDeadline - DateTime.Now.MillisecondsSince1970())); + } + else + { + item = _eventQueue.Take(); + Thread.Sleep(1); // TODO: need to figure out why this is allowing item to read shutdown signal. } - if (!_eventQueue.TryTake(out object item, 50)) + if (item == null) { - Thread.Sleep(50); + // null means no new events received and flush interval is over, dispatch whatever is in the batch. + if (_currentBatch.Count != 0) + { + _logger.Log(LogLevel.DEBUG, $"Flushing queue."); + FlushQueue(); + } continue; } - - if (item == _shutdownSignal) + else if (item == _shutdownSignal) { _logger.Log(LogLevel.INFO, "Received shutdown signal."); break; } - - if (item == _flushSignal) + else if (item == _flushSignal) { _logger.Log(LogLevel.DEBUG, "Received flush signal."); FlushQueue(); continue; } - - if (item is OdpEvent odpEvent) + else if (item is OdpEvent odpEvent) { AddToBatch(odpEvent); } + } } catch (InvalidOperationException ioe)