Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.

using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -10,7 +11,7 @@ namespace System.Reactive.Concurrency
{
public static class AsyncScheduler
{
public static IAwaitable RendezVous(this IAsyncScheduler scheduler, CancellationToken token = default)
public static RendezVousAwaitable RendezVous(this IAsyncScheduler scheduler, CancellationToken token = default)
{
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
Expand All @@ -20,7 +21,7 @@ public static IAwaitable RendezVous(this IAsyncScheduler scheduler, Cancellation
return new RendezVousAwaitable(scheduler, token);
}

public static IAwaitable RendezVous(this Task task, IAsyncScheduler scheduler, CancellationToken token = default)
public static TaskAwaitable RendezVous(this Task task, IAsyncScheduler scheduler, CancellationToken token = default)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
Expand All @@ -30,7 +31,7 @@ public static IAwaitable RendezVous(this Task task, IAsyncScheduler scheduler, C
return new TaskAwaitable(task, continueOnCapturedContext: false, scheduler, token);
}

public static IAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler, CancellationToken token = default)
public static TaskAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler scheduler, CancellationToken token = default)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
Expand All @@ -40,15 +41,15 @@ public static IAwaitable<T> RendezVous<T>(this Task<T> task, IAsyncScheduler sch
return new TaskAwaitable<T>(task, continueOnCapturedContext: false, scheduler, token);
}

public static IAwaitable RendezVous(this ValueTask task, IAsyncScheduler scheduler, CancellationToken token = default)
public static ValueTaskAwaitable RendezVous(this ValueTask task, IAsyncScheduler scheduler, CancellationToken token = default)
{
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return new ValueTaskAwaitable(task, continueOnCapturedContext: false, scheduler, token);
}

public static IAwaitable<T> RendezVous<T>(this ValueTask<T> task, IAsyncScheduler scheduler, CancellationToken token = default)
public static ValueTaskAwaitable<T> RendezVous<T>(this ValueTask<T> task, IAsyncScheduler scheduler, CancellationToken token = default)
{
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
Expand Down Expand Up @@ -196,54 +197,66 @@ public static async ValueTask<TResult> ExecuteAsync<TResult>(this IAsyncSchedule
}
}

private sealed class RendezVousAwaitable : IAwaitable, IAwaiter // PERF: Can we avoid these allocations?
public readonly struct RendezVousAwaitable
{
private readonly IAsyncScheduler _scheduler;
private readonly CancellationToken _token;
private ExceptionDispatchInfo _error;

public RendezVousAwaitable(IAsyncScheduler scheduler, CancellationToken token)
{
_scheduler = scheduler;
_token = token;
}

public bool IsCompleted { get; private set; }
public RendezVousAwaiter GetAwaiter() => new RendezVousAwaiter(_scheduler, _token);

public IAwaiter GetAwaiter() => this;

public void GetResult()
public sealed class RendezVousAwaiter : INotifyCompletion
{
if (!IsCompleted)
{
throw new InvalidOperationException(); // REVIEW: No support for blocking.
}
private readonly IAsyncScheduler _scheduler;
private readonly CancellationToken _token;
private ExceptionDispatchInfo _error;

if (_error != null)
public RendezVousAwaiter(IAsyncScheduler scheduler, CancellationToken token)
{
_error.Throw();
_scheduler = scheduler;
_token = token;
}
}

public void OnCompleted(Action continuation)
{
var t = _scheduler.ExecuteAsync(ct =>
public bool IsCompleted { get; private set; }

public void GetResult()
{
try
if (!IsCompleted)
{
continuation();
throw new InvalidOperationException(); // REVIEW: No support for blocking.
}
catch (Exception ex)
{
_error = ExceptionDispatchInfo.Capture(ex);
}
finally

if (_error != null)
{
IsCompleted = true;
_error.Throw();
}
}

return default;
}, _token);
public void OnCompleted(Action continuation)
{
var t = _scheduler.ExecuteAsync(ct =>
{
try
{
continuation();
}
catch (Exception ex)
{
_error = ExceptionDispatchInfo.Capture(ex);
}
finally
{
IsCompleted = true;
}

return default;
}, _token);
}
}
}
}
Expand Down
Loading