Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions src/Darp.Ble/BleObserverExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,7 @@ public static class BleObserverExtensions
public static IObservable<IGapAdvertisement> OnAdvertisement(this IBleObserver observer)
{
ArgumentNullException.ThrowIfNull(observer);
return Observable.Create<IGapAdvertisement>(advObserver =>
observer.OnAdvertisement(
advObserver,
static (advObserver, advertisement) => advObserver.OnNext(advertisement)
)
);
}

/// <summary> Register a callback called when an advertisement was received </summary>
/// <param name="bleObserver">The instance of <see cref="IBleObserver"/> that will monitor BLE advertisements.</param>
/// <param name="onAdvertisement"> The callback </param>
/// <returns> A disposable to unsubscribe the callback </returns>
public static IDisposable OnAdvertisement(this IBleObserver bleObserver, Action<IGapAdvertisement> onAdvertisement)
{
ArgumentNullException.ThrowIfNull(bleObserver);
return bleObserver.OnAdvertisement(onAdvertisement, static (action, advertisement) => action(advertisement));
return Observable.Create<IGapAdvertisement>(advObserver => observer.OnAdvertisement(advObserver.OnNext));
}

/// <summary> Publish the observer to allow observing advertisements without having to start/stop observation manually </summary>
Expand All @@ -43,10 +28,7 @@ public static IConnectableObservable<IGapAdvertisement> Publish(this IBleObserve

IObservable<IGapAdvertisement> inner = Observable.Create<IGapAdvertisement>(async observer =>
{
IDisposable unhook = bleObserver.OnAdvertisement(
observer,
onAdvertisement: static (observer, adv) => observer.OnNext(adv)
);
IDisposable unhook = bleObserver.OnAdvertisement(onAdvertisement: observer.OnNext);

await bleObserver.StartObservingAsync().ConfigureAwait(false);

Expand Down
4 changes: 1 addition & 3 deletions src/Darp.Ble/IBleObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ public interface IBleObserver
bool Configure(BleObservationParameters parameters);

/// <summary> Register a callback called when an advertisement was received </summary>
/// <param name="state"> A state to be passed to the callback </param>
/// <param name="onAdvertisement"> The callback </param>
/// <typeparam name="T"> The type of the state </typeparam>
/// <returns> A disposable to unsubscribe the callback </returns>
IDisposable OnAdvertisement<T>(T state, Action<T, IGapAdvertisement> onAdvertisement);
IDisposable OnAdvertisement(Action<IGapAdvertisement> onAdvertisement);

/// <summary> Start observing for advertisements. </summary>
/// <param name="cancellationToken"> The CancellationToken to cancel the initial starting process </param>
Expand Down
118 changes: 78 additions & 40 deletions src/Darp.Ble/Implementation/BleObserver.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System.Diagnostics;
using System.Reactive.Disposables;
using Darp.Ble.Data;
using Darp.Ble.Exceptions;
using Darp.Ble.Gap;
using Darp.Ble.Utils;
using Microsoft.Extensions.Logging;
#if !NET9_0_OR_GREATER
using Lock = System.Object;
Expand All @@ -22,13 +22,14 @@ internal enum ObserverState
/// <summary> The ble observer </summary>
/// <param name="device"> The ble device </param>
/// <param name="logger"> The logger </param>
public abstract class BleObserver(BleDevice device, ILogger<BleObserver> logger) : IAsyncDisposable, IBleObserver
public abstract class BleObserver(BleDevice device, ILogger<BleObserver> logger) : IBleObserver, IAsyncDisposable
{
private readonly BleDevice _bleDevice = device;
private readonly List<Action<IGapAdvertisement>> _actions = [];
private readonly Lock _lock = new();
private readonly SemaphoreSlim _observationStartSemaphore = new(1, 1);
private ObserverState _observerState = ObserverState.Stopped;
private readonly SemaphoreSlim _startStopSemaphore = new(1, 1);
private readonly Lock _handlersLock = new();
private Action<IGapAdvertisement>[] _handlers = [];

private volatile ObserverState _observerState = ObserverState.Stopped;

/// <summary> The logger </summary>
protected ILogger<BleObserver> Logger { get; } = logger;
Expand All @@ -54,28 +55,40 @@ public abstract class BleObserver(BleDevice device, ILogger<BleObserver> logger)
/// <inheritdoc />
public bool Configure(BleObservationParameters parameters)
{
if (_observerState is not ObserverState.Stopped)
ObjectDisposedException.ThrowIf(_bleDevice.IsDisposing, nameof(BleObserver));

if (!_startStopSemaphore.Wait(0))
return false;
Parameters = parameters;
return true;
try
{
if (_observerState is not ObserverState.Stopped)
return false;
Parameters = parameters;
return true;
}
finally
{
_startStopSemaphore.Release();
}
}

/// <inheritdoc />
public async Task StartObservingAsync(CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(_bleDevice.IsDisposing, nameof(BleObserver));

await _observationStartSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await _startStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_observerState is ObserverState.Observing)
return;
if (_observerState is not ObserverState.Stopped)
throw new InvalidOperationException($"Observer is in invalid state {_observerState}");

Debug.Assert(_observerState == ObserverState.Stopped);
_observerState = ObserverState.Starting;
await StartObservingAsyncCore(cancellationToken).ConfigureAwait(false);
_observerState = ObserverState.Observing;
Logger.LogTrace("Started advertising observation");
Logger.LogObserverStarted();
}
catch (Exception e) when (e is not BleObservationStartException)
{
Expand All @@ -85,28 +98,34 @@ public async Task StartObservingAsync(CancellationToken cancellationToken = defa
}
finally
{
_observationStartSemaphore.Release();
_startStopSemaphore.Release();
}
}

/// <inheritdoc />
public IDisposable OnAdvertisement<T>(T state, Action<T, IGapAdvertisement> onAdvertisement)
public IDisposable OnAdvertisement(Action<IGapAdvertisement> onAdvertisement)
{
ObjectDisposedException.ThrowIf(_bleDevice.IsDisposing, nameof(BleObserver));
Action<IGapAdvertisement> action = advertisement => onAdvertisement(state, advertisement);
lock (_lock)

// Extend handlers list
lock (_handlersLock)
{
_actions.Add(action);
Action<IGapAdvertisement>[] oldHandlers = _handlers;
var newArr = new Action<IGapAdvertisement>[oldHandlers.Length + 1];
Array.Copy(oldHandlers, newArr, oldHandlers.Length);
newArr[^1] = onAdvertisement;
Volatile.Write(ref _handlers, newArr);
}

return Disposable.Create(
(this, action),
(this, onAdvertisement),
static tuple =>
{
(BleObserver bleObserver, Action<IGapAdvertisement> action) = tuple;
lock (bleObserver._lock)
(BleObserver self, Action<IGapAdvertisement> handler) = tuple;
lock (self._handlersLock)
{
bleObserver._actions.Remove(action);
if (Helpers.TryRemoveImmutable(self._handlers, handler, out var newHandlers))
Volatile.Write(ref self._handlers, newHandlers);
}
}
);
Expand All @@ -116,19 +135,25 @@ public IDisposable OnAdvertisement<T>(T state, Action<T, IGapAdvertisement> onAd
/// <param name="advertisement"> The advertisement </param>
protected void OnNext(IGapAdvertisement advertisement)
{
lock (_lock)
// Try to suppress receival of advertisements after stopping/disposal
// Best-effort only. No thread safety guarantees
if (_bleDevice.IsDisposing || _observerState is ObserverState.Stopping or ObserverState.Stopped)
return;

// Taking the current snapshot of the handlers.
// In case of an unsubscription of a handler we might have taken the reference here already and call it afterward.
// This is a known tradeoff
Action<IGapAdvertisement>[] handlers = Volatile.Read(ref _handlers);
foreach (Action<IGapAdvertisement> handler in handlers)
{
for (int i = _actions.Count - 1; i >= 0; i--)
try
{
try
{
var onAdvertisement = _actions[i];
onAdvertisement(advertisement);
}
catch (Exception e)
{
Logger.LogWarning(e, "Exception while handling advertisement event: {Message}", e.Message);
}
handler(advertisement);
}
catch (Exception e)
{
// An exception inside the handler should not crash all observers. Logging and ignoring ...
Logger.LogObservationErrorDuringAdvertisementHandling(e);
}
}
}
Expand All @@ -141,24 +166,37 @@ protected void OnNext(IGapAdvertisement advertisement)
/// <inheritdoc />
public async Task StopObservingAsync()
{
await _observationStartSemaphore.WaitAsync().ConfigureAwait(false);
await _startStopSemaphore.WaitAsync().ConfigureAwait(false);
try
{
// Return early if
// Stopped -> Nothing to do
// Stopping -> Some recursive call has lead to us being here
if (_observerState is ObserverState.Stopping or ObserverState.Stopped)
return;
if (_observerState is not ObserverState.Observing)
throw new InvalidOperationException($"Observer is in invalid state {_observerState}");

Debug.Assert(_observerState == ObserverState.Observing);
_observerState = ObserverState.Stopping;
await StopObservingAsyncCore().ConfigureAwait(false);
Logger.LogTrace("Stopped advertising observation");

try
{
await StopObservingAsyncCore().ConfigureAwait(false);
}
catch (Exception e)
{
Logger.LogObserverErrorDuringStopping(e);
// In case of an error when stopping we assume we are still observing
// Not ideal, but better than to wait for ever
_observerState = ObserverState.Observing;
throw;
}
Logger.LogObserverStopped();
_observerState = ObserverState.Stopped;
}
finally
{
_observationStartSemaphore.Release();
_startStopSemaphore.Release();
}
}

Expand All @@ -171,11 +209,11 @@ public async ValueTask DisposeAsync()
{
GC.SuppressFinalize(this);
await StopObservingAsync().ConfigureAwait(false);
lock (_lock)
lock (_handlersLock)
{
_actions.Clear();
_handlers = [];
}
_observationStartSemaphore.Dispose();
_startStopSemaphore.Dispose();
await DisposeAsyncCore().ConfigureAwait(false);
}

Expand Down
12 changes: 12 additions & 0 deletions src/Darp.Ble/Logging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,16 @@ internal static partial class Logging

[LoggerMessage(Level = LogLevel.Trace, Message = "Ble server peer '{Address}' disposed!")]
public static partial void LogBleServerPeerDisposed(this ILogger logger, BleAddress address);

[LoggerMessage(Level = LogLevel.Trace, Message = "Started advertising observation")]
public static partial void LogObserverStarted(this ILogger logger);

[LoggerMessage(Level = LogLevel.Trace, Message = "Stopped advertising observation")]
public static partial void LogObserverStopped(this ILogger logger);

[LoggerMessage(Level = LogLevel.Error, Message = "Exception while handling advertisement event")]
public static partial void LogObservationErrorDuringAdvertisementHandling(this ILogger logger, Exception e);

[LoggerMessage(Level = LogLevel.Error, Message = "Exception while stopping observation")]
public static partial void LogObserverErrorDuringStopping(this ILogger logger, Exception e);
}
32 changes: 32 additions & 0 deletions src/Darp.Ble/Utils/Helpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Diagnostics.CodeAnalysis;

namespace Darp.Ble.Utils;

internal static class Helpers
{
public static bool TryRemoveImmutable<T>(T[] array, T item, [NotNullWhen(true)] out T[]? newArray)
{
// Check if there is a handler to remove
int handlerIndex = Array.IndexOf(array, item);
if (handlerIndex < 0)
{
newArray = null;
return false;
}

Span<T> arraySpan = array;
if (arraySpan.Length == 1)
{
newArray = [];
return true;
}

// Remove the handler from the array
newArray = new T[arraySpan.Length - 1];
if (handlerIndex > 0)
arraySpan[..handlerIndex].CopyTo(newArray);
if (handlerIndex < arraySpan.Length - 1)
arraySpan[(handlerIndex + 1)..].CopyTo(newArray.AsSpan()[handlerIndex..]);
return true;
}
}
Loading
Loading