Skip to content

Blazor Streaming Interop | JS to DotNet #33491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Jun 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1d32e1e
Streaming Interop (WIP)
TanayParikh Jun 10, 2021
66dbaf5
Cancel JS Data Stream (with clearTimeout)
TanayParikh Jun 11, 2021
2509ce5
Cancel without clearTimeout
TanayParikh Jun 11, 2021
50bcb31
Determine chunk size based on SignalR limit
TanayParikh Jun 11, 2021
5b69105
Add DeserializedJSObjectReferenceValues
TanayParikh Jun 11, 2021
519f854
Update JSObjectReference.cs
TanayParikh Jun 11, 2021
b35027b
Move RemoteJSDataStream Instances to RemoteJSRuntime
TanayParikh Jun 11, 2021
46f4ddf
Cancel using SupplyJSDataChunk
TanayParikh Jun 11, 2021
c77fd3e
Merge branch 'main' into taparik/streamingInterop
TanayParikh Jun 11, 2021
e1635e6
Update blazor.server.js
TanayParikh Jun 11, 2021
1b76862
Merge branch 'main' into taparik/streamingInterop
TanayParikh Jun 14, 2021
c97360c
PR Feedback
TanayParikh Jun 14, 2021
b266fba
JSCallResultType JSDataReference
TanayParikh Jun 14, 2021
7133bd6
Update PublicAPI.Unshipped.txt
TanayParikh Jun 14, 2021
e3956fe
PR Feedback Continued
TanayParikh Jun 14, 2021
639b34f
Unit Tests
TanayParikh Jun 14, 2021
7f89312
@pranavkm feedback
TanayParikh Jun 14, 2021
115d935
@BrennanConroy PR Feedback
TanayParikh Jun 14, 2021
1e01c62
E2E JS To .NET Interop Test
TanayParikh Jun 14, 2021
80a2171
Merge branch 'main' into taparik/streamingInterop
TanayParikh Jun 14, 2021
6a931ad
Update blazor.server.js
TanayParikh Jun 14, 2021
e799997
Update InteropComponent.razor
TanayParikh Jun 14, 2021
9296a36
Make LinkedCTS More Efficient
TanayParikh Jun 15, 2021
e07686d
Update RemoteJSDataStream.cs
TanayParikh Jun 15, 2021
62f2338
Merge branch 'main' into taparik/streamingInterop
TanayParikh Jun 15, 2021
e7f44a2
Merge branch 'main' into taparik/streamingInterop
TanayParikh Jun 17, 2021
12f7b89
Revert SignalR Changes
TanayParikh Jun 17, 2021
e37413b
PR Feedback
TanayParikh Jun 17, 2021
eafd3f8
E2E tests
TanayParikh Jun 18, 2021
903b16d
JSDataReference->JSStreamReference
TanayParikh Jun 18, 2021
0bb3e6c
PR Feedback
TanayParikh Jun 19, 2021
381c53d
1 Minutes Inactivity Timeout
TanayParikh Jun 21, 2021
6e09532
Integrate chunk sequence validation for SignalR Disconnect
TanayParikh Jun 21, 2021
350f4de
Merge branch 'main' into taparik/streamingInterop
TanayParikh Jun 22, 2021
b5615c7
Update ReceiveDataTimeout
TanayParikh Jun 22, 2021
53f0392
Update RemoteJSDataStream.cs
TanayParikh Jun 22, 2021
3cd7619
ReceiveData Timeout UnhandledException
TanayParikh Jun 22, 2021
d549784
Improve test reliability
TanayParikh Jun 22, 2021
7589cd0
Merge branch 'main' into taparik/streamingInterop
TanayParikh Jun 23, 2021
9e63bb6
PR Feedback
TanayParikh Jun 25, 2021
f565366
Update RemoteJSDataStreamTest.cs
TanayParikh Jun 25, 2021
1b8ed19
Merge branch 'main' into taparik/streamingInterop
TanayParikh Jun 25, 2021
c1e0873
Update blazor.*.js
TanayParikh Jun 25, 2021
6e91d84
Cleanup & Fix Test
TanayParikh Jun 26, 2021
20e8489
Remote timeout related tests
TanayParikh Jun 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/Components/Server/src/Circuits/CircuitHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Globalization;
using System.Security.Claims;
Expand Down Expand Up @@ -67,8 +68,11 @@ public CircuitHost(
Circuit = new Circuit(this);
Handle = new CircuitHandle() { CircuitHost = this, };

Renderer.UnhandledException += Renderer_UnhandledException;
// An unhandled exception from the renderer is always fatal because it came from user code.
Renderer.UnhandledException += ReportAndInvoke_UnhandledException;
Renderer.UnhandledSynchronizationException += SynchronizationContext_UnhandledException;

JSRuntime.UnhandledException += ReportAndInvoke_UnhandledException;
}

public CircuitHandle Handle { get; }
Expand Down Expand Up @@ -415,6 +419,31 @@ await Renderer.Dispatcher.InvokeAsync(() =>
}
}

// ReceiveJSDataChunk is used in a fire-and-forget context, so it's responsible for its own
// error handling.
internal async Task<bool> ReceiveJSDataChunk(long streamId, long chunkId, byte[] chunk, string error)
{
AssertInitialized();
AssertNotDisposed();

try
{
return await Renderer.Dispatcher.InvokeAsync(() =>
{
return RemoteJSDataStream.ReceiveData(JSRuntime, streamId, chunkId, chunk, error);
});
}
catch (Exception ex)
{
// An error completing JS interop means that the user sent invalid data, a well-behaved
// client won't do this.
Log.ReceiveJSDataChunkException(_logger, streamId, ex);
await TryNotifyClientErrorAsync(Client, GetClientErrorMessage(ex, "Invalid chunk supplied to stream."));
UnhandledException?.Invoke(this, new UnhandledExceptionEventArgs(ex, isTerminating: false));
return false;
}
}

// DispatchEvent is used in a fire-and-forget context, so it's responsible for its own
// error handling.
public async Task DispatchEvent(string eventDescriptorJson, string eventArgsJson)
Expand Down Expand Up @@ -548,9 +577,8 @@ private void AssertNotDisposed()
}
}

// An unhandled exception from the renderer is always fatal because it came from user code.
// We want to notify the client if it's still connected, and then tear-down the circuit.
private async void Renderer_UnhandledException(object sender, Exception e)
private async void ReportAndInvoke_UnhandledException(object sender, Exception e)
{
await ReportUnhandledException(e);
UnhandledException?.Invoke(this, new UnhandledExceptionEventArgs(e, isTerminating: false));
Expand Down Expand Up @@ -638,6 +666,7 @@ private static class Log
private static readonly Action<ILogger, long, Exception> _endInvokeJSSucceeded;
private static readonly Action<ILogger, long, Exception> _receiveByteArraySuccess;
private static readonly Action<ILogger, long, Exception> _receiveByteArrayException;
private static readonly Action<ILogger, long, Exception> _receiveJSDataChunkException;
private static readonly Action<ILogger, Exception> _dispatchEventFailedToParseEventData;
private static readonly Action<ILogger, string, Exception> _dispatchEventFailedToDispatchEvent;
private static readonly Action<ILogger, string, CircuitId, Exception> _locationChange;
Expand Down Expand Up @@ -682,6 +711,7 @@ private static class EventIds
public static readonly EventId OnRenderCompletedFailed = new EventId(212, "OnRenderCompletedFailed");
public static readonly EventId ReceiveByteArraySucceeded = new EventId(213, "ReceiveByteArraySucceeded");
public static readonly EventId ReceiveByteArrayException = new EventId(214, "ReceiveByteArrayException");
public static readonly EventId ReceiveJSDataChunkException = new EventId(215, "ReceiveJSDataChunkException");
}

static Log()
Expand Down Expand Up @@ -811,6 +841,11 @@ static Log()
EventIds.ReceiveByteArrayException,
"The ReceiveByteArray call with id '{id}' failed.");

_receiveJSDataChunkException = LoggerMessage.Define<long>(
LogLevel.Debug,
EventIds.ReceiveJSDataChunkException,
"The ReceiveJSDataChunk call with stream id '{streamId}' failed.");

_dispatchEventFailedToParseEventData = LoggerMessage.Define(
LogLevel.Debug,
EventIds.DispatchEventFailedToParseEventData,
Expand Down Expand Up @@ -875,6 +910,7 @@ public static void CircuitHandlerFailed(ILogger logger, CircuitHandler handler,
public static void EndInvokeJSSucceeded(ILogger logger, long asyncCall) => _endInvokeJSSucceeded(logger, asyncCall, null);
internal static void ReceiveByteArraySuccess(ILogger logger, long id) => _receiveByteArraySuccess(logger, id, null);
internal static void ReceiveByteArrayException(ILogger logger, long id, Exception ex) => _receiveByteArrayException(logger, id, ex);
internal static void ReceiveJSDataChunkException(ILogger logger, long streamId, Exception ex) => _receiveJSDataChunkException(logger, streamId, ex);
public static void DispatchEventFailedToParseEventData(ILogger logger, Exception ex) => _dispatchEventFailedToParseEventData(logger, ex);
public static void DispatchEventFailedToDispatchEvent(ILogger logger, string eventHandlerId, Exception ex) => _dispatchEventFailedToDispatchEvent(logger, eventHandlerId ?? "", ex);

Expand Down
224 changes: 224 additions & 0 deletions src/Components/Server/src/Circuits/RemoteJSDataStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.JSInterop;

namespace Microsoft.AspNetCore.Components.Server.Circuits
{
internal sealed class RemoteJSDataStream : Stream
{
private readonly RemoteJSRuntime _runtime;
private readonly long _streamId;
private readonly long _totalLength;
private readonly TimeSpan _jsInteropDefaultCallTimeout;
private readonly CancellationToken _streamCancellationToken;
private readonly Stream _pipeReaderStream;
private readonly Pipe _pipe;
private long _bytesRead;
private long _expectedChunkId;
private DateTimeOffset _lastDataReceivedTime;
private bool _disposed;

public static async Task<bool> ReceiveData(RemoteJSRuntime runtime, long streamId, long chunkId, byte[] chunk, string error)
{
if (!runtime.RemoteJSDataStreamInstances.TryGetValue(streamId, out var instance))
{
// There is no data stream with the given identifier. It may have already been disposed.
// We notify JS that the stream has been cancelled/disposed.
return false;
}

return await instance.ReceiveData(chunkId, chunk, error);
}

public static async ValueTask<RemoteJSDataStream> CreateRemoteJSDataStreamAsync(
RemoteJSRuntime runtime,
IJSStreamReference jsStreamReference,
long totalLength,
long maxBufferSize,
long maximumIncomingBytes,
TimeSpan jsInteropDefaultCallTimeout,
CancellationToken cancellationToken = default)
{
// Enforce minimum 1 kb, maximum 50 kb, SignalR message size.
// We budget 512 bytes overhead for the transfer, thus leaving at least 512 bytes for data
// transfer per chunk with a 1 kb message size.
// Additionally, to maintain interactivity, we put an upper limit of 50 kb on the message size.
var chunkSize = maximumIncomingBytes > 1024 ?
Math.Min(maximumIncomingBytes, 50*1024) - 512 :
throw new ArgumentException($"SignalR MaximumIncomingBytes must be at least 1 kb.");

var streamId = runtime.RemoteJSDataStreamNextInstanceId++;
var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, maxBufferSize, jsInteropDefaultCallTimeout, cancellationToken);
await runtime.InvokeVoidAsync("Blazor._internal.sendJSDataStream", jsStreamReference, streamId, chunkSize);
return remoteJSDataStream;
}

private RemoteJSDataStream(
RemoteJSRuntime runtime,
long streamId,
long totalLength,
long maxBufferSize,
TimeSpan jsInteropDefaultCallTimeout,
CancellationToken cancellationToken)
{
_runtime = runtime;
_streamId = streamId;
_totalLength = totalLength;
_jsInteropDefaultCallTimeout = jsInteropDefaultCallTimeout;
_streamCancellationToken = cancellationToken;

_lastDataReceivedTime = DateTimeOffset.UtcNow;
_ = ThrowOnTimeout();

_runtime.RemoteJSDataStreamInstances.Add(_streamId, this);

_pipe = new Pipe(new PipeOptions(pauseWriterThreshold: maxBufferSize, resumeWriterThreshold: maxBufferSize / 2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidfowl since you are the pipelines expert, can you review our usage in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From @davidfowl

the size is too big
I would leave the defaults
and have it be configurable
maxBufferSize is being passed into the pause threshold

_pipeReaderStream = _pipe.Reader.AsStream();
}

private async Task<bool> ReceiveData(long chunkId, byte[] chunk, string error)
{
try
{
_lastDataReceivedTime = DateTimeOffset.UtcNow;
_ = ThrowOnTimeout();

if (!string.IsNullOrEmpty(error))
{
throw new InvalidOperationException($"An error occurred while reading the remote stream: {error}");
}

if (chunkId != _expectedChunkId)
{
throw new EndOfStreamException($"Out of sequence chunk received, expected {_expectedChunkId}, but received {chunkId}.");
}

++_expectedChunkId;

if (chunk.Length == 0)
{
throw new EndOfStreamException($"The incoming data chunk cannot be empty.");
}

_bytesRead += chunk.Length;

if (_bytesRead > _totalLength)
{
throw new EndOfStreamException($"The incoming data stream declared a length {_totalLength}, but {_bytesRead} bytes were sent.");
}

await _pipe.Writer.WriteAsync(chunk, _streamCancellationToken);

if (_bytesRead == _totalLength)
{
await CompletePipeAndDisposeStream();
}

return true;
}
catch (Exception e)
{
await CompletePipeAndDisposeStream(e);

// Fatal exception, crush the circuit. A well behaved client
// should not result in this type of exception.
if (e is EndOfStreamException)
{
throw;
}

return false;
}
}

public override bool CanRead => true;

public override bool CanSeek => false;

public override bool CanWrite => false;

public override long Length => _totalLength;

public override long Position
{
get => _pipeReaderStream.Position;
set => throw new NotSupportedException();
}

public override void Flush()
=> throw new NotSupportedException();

public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException("Synchronous reads are not supported.");

public override long Seek(long offset, SeekOrigin origin)
=> throw new NotSupportedException();

public override void SetLength(long value)
=> throw new NotSupportedException();

public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var linkedCancellationToken = GetLinkedCancellationToken(_streamCancellationToken, cancellationToken);
return await _pipeReaderStream.ReadAsync(buffer.AsMemory(offset, count), linkedCancellationToken);
}

public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
var linkedCancellationToken = GetLinkedCancellationToken(_streamCancellationToken, cancellationToken);
return await _pipeReaderStream.ReadAsync(buffer, linkedCancellationToken);
}

private static CancellationToken GetLinkedCancellationToken(CancellationToken a, CancellationToken b)
{
if (a.CanBeCanceled && b.CanBeCanceled)
{
return CancellationTokenSource.CreateLinkedTokenSource(a, b).Token;
}
else if (a.CanBeCanceled)
{
return a;
}

return b;
}

private async Task ThrowOnTimeout()
{
await Task.Delay(_jsInteropDefaultCallTimeout);

if (!_disposed && (DateTimeOffset.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout)))
{
// Dispose of the stream if a chunk isn't received within the jsInteropDefaultCallTimeout.
var timeoutException = new TimeoutException("Did not receive any data in the alloted time.");
await CompletePipeAndDisposeStream(timeoutException);
_runtime.RaiseUnhandledException(timeoutException);
}
}

internal async Task CompletePipeAndDisposeStream(Exception? ex = null)
{
await _pipe.Writer.CompleteAsync(ex);
Dispose(true);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
_runtime.RemoteJSDataStreamInstances.Remove(_streamId);
}

_disposed = true;
}
}
}
21 changes: 20 additions & 1 deletion src/Components/Server/src/Circuits/RemoteJSRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -20,10 +24,18 @@ internal class RemoteJSRuntime : JSRuntime
private readonly long _maximumIncomingBytes;
private int _byteArraysToBeRevivedTotalBytes;

internal int RemoteJSDataStreamNextInstanceId;
internal readonly Dictionary<long, RemoteJSDataStream> RemoteJSDataStreamInstances = new();

public ElementReferenceContext ElementReferenceContext { get; }

public bool IsInitialized => _clientProxy is not null;

/// <summary>
/// Notifies when a runtime exception occurred.
/// </summary>
public event EventHandler<Exception>? UnhandledException;

public RemoteJSRuntime(
IOptions<CircuitOptions> circuitOptions,
IOptions<HubOptions> hubOptions,
Expand All @@ -46,6 +58,11 @@ internal void Initialize(CircuitClientProxy clientProxy)
_clientProxy = clientProxy ?? throw new ArgumentNullException(nameof(clientProxy));
}

internal void RaiseUnhandledException(Exception ex)
{
UnhandledException?.Invoke(this, ex);
}

protected override void EndInvokeDotNet(DotNetInvocationInfo invocationInfo, in DotNetInvocationResult invocationResult)
{
if (!invocationResult.Success)
Expand Down Expand Up @@ -140,6 +157,9 @@ public void MarkPermanentlyDisconnected()
_clientProxy = null;
}

protected override async Task<Stream> ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long maxBufferSize, CancellationToken cancellationToken)
=> await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, maxBufferSize, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, cancellationToken);

public static class Log
{
private static readonly Action<ILogger, long, string, Exception> _beginInvokeJS =
Expand Down Expand Up @@ -198,7 +218,6 @@ internal static void InvokeDotNetMethodSuccess(ILogger<RemoteJSRuntime> logger,
{
_invokeInstanceDotNetMethodSuccess(logger, invocationInfo.MethodIdentifier, invocationInfo.DotNetObjectId, invocationInfo.CallId, null);
}

}
}
}
Expand Down
Loading