diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.AbortAsync.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.AbortAsync.cs index f890ccb36e..8ccf329197 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.AbortAsync.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.AbortAsync.cs @@ -2,13 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Threading.Channels; using System.Threading.Tasks; -using Microsoft.AspNetCore.Sockets; -using Microsoft.AspNetCore.Sockets.Client; -using Microsoft.AspNetCore.Sockets.Client.Http; -using Microsoft.AspNetCore.Sockets.Client.Tests; -using Microsoft.Extensions.Logging.Abstractions; using Xunit; namespace Microsoft.AspNetCore.SignalR.Client.Tests @@ -19,10 +13,9 @@ public partial class HttpConnectionTests public class AbortAsync { [Fact] - public async Task AbortAsyncTriggersClosedEventWithException() + public Task AbortAsyncTriggersClosedEventWithException() { - var connection = CreateConnection(out var closedTask); - try + return WithConnectionAsync(CreateConnection(), async (connection, closed) => { // Start the connection await connection.StartAsync().OrTimeout(); @@ -32,22 +25,15 @@ public async Task AbortAsyncTriggersClosedEventWithException() await connection.AbortAsync(expected).OrTimeout(); // Verify that it is thrown - var actual = await Assert.ThrowsAsync(async () => await closedTask.OrTimeout()); + var actual = await Assert.ThrowsAsync(async () => await closed.OrTimeout()); Assert.Same(expected, actual); - } - finally - { - // Dispose should be clean and exception free. - await connection.DisposeAsync().OrTimeout(); - } + }); } [Fact] - public async Task AbortAsyncWhileStoppingTriggersClosedEventWithException() + public Task AbortAsyncWhileStoppingTriggersClosedEventWithException() { - var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(2, out var syncPoints)); - - try + return WithConnectionAsync(CreateConnection(transport: new TestTransport(onTransportStop: SyncPoint.Create(2, out var syncPoints))), async (connection, closed) => { // Start the connection await connection.StartAsync().OrTimeout(); @@ -69,26 +55,19 @@ public async Task AbortAsyncWhileStoppingTriggersClosedEventWithException() syncPoints[0].Continue(); // We should close with the error from Abort (because it was set by the call to Abort even though Stop triggered the close) - var actual = await Assert.ThrowsAsync(async () => await closedTask.OrTimeout()); + var actual = await Assert.ThrowsAsync(async () => await closed.OrTimeout()); Assert.Same(expected, actual); // Clean-up syncPoints[1].Continue(); await Task.WhenAll(stopTask, abortTask).OrTimeout(); - } - finally - { - // Dispose should be clean and exception free. - await connection.DisposeAsync().OrTimeout(); - } + }); } [Fact] - public async Task StopAsyncWhileAbortingTriggersClosedEventWithoutException() + public Task StopAsyncWhileAbortingTriggersClosedEventWithoutException() { - var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(2, out var syncPoints)); - - try + return WithConnectionAsync(CreateConnection(transport: new TestTransport(onTransportStop: SyncPoint.Create(2, out var syncPoints))), async (connection, closed) => { // Start the connection await connection.StartAsync().OrTimeout(); @@ -104,25 +83,18 @@ public async Task StopAsyncWhileAbortingTriggersClosedEventWithoutException() // This should clear the exception, meaning Closed will not "throw" syncPoints[1].Continue(); await connection.StopAsync(); - await closedTask.OrTimeout(); + await closed.OrTimeout(); // Clean-up syncPoints[0].Continue(); await abortTask.OrTimeout(); - } - finally - { - // Dispose should be clean and exception free. - await connection.DisposeAsync().OrTimeout(); - } + }); } [Fact] - public async Task StartAsyncCannotBeCalledWhileAbortAsyncInProgress() + public Task StartAsyncCannotBeCalledWhileAbortAsyncInProgress() { - var connection = CreateConnection(out var closedTask, stopHandler: SyncPoint.Create(out var syncPoint)); - - try + return WithConnectionAsync(CreateConnection(transport: new TestTransport(onTransportStop: SyncPoint.Create(out var syncPoint))), async (connection, closed) => { // Start the connection await connection.StartAsync().OrTimeout(); @@ -141,7 +113,7 @@ public async Task StartAsyncCannotBeCalledWhileAbortAsyncInProgress() // (it will throw the abort exception) syncPoint.Continue(); await abortTask.OrTimeout(); - var actual = await Assert.ThrowsAsync(() => closedTask.OrTimeout()); + var actual = await Assert.ThrowsAsync(() => closed.OrTimeout()); Assert.Same(expected, actual); // We can start now @@ -149,126 +121,7 @@ public async Task StartAsyncCannotBeCalledWhileAbortAsyncInProgress() // And we can stop without getting the abort exception. await connection.StopAsync().OrTimeout(); - } - finally - { - // Dispose should be clean and exception free. - await connection.DisposeAsync().OrTimeout(); - } - } - - private HttpConnection CreateConnection(out Task closedTask, Func stopHandler = null) - { - var httpHandler = new TestHttpMessageHandler(); - var transportFactory = new TestTransportFactory(new TestTransport(stopHandler)); - var connection = new HttpConnection( - new Uri("http://fakeuri.org/"), - transportFactory, - NullLoggerFactory.Instance, - new HttpOptions() - { - HttpMessageHandler = httpHandler, - }); - - var closedTcs = new TaskCompletionSource(); - connection.Closed += ex => - { - if (ex != null) - { - closedTcs.SetException(ex); - } - else - { - closedTcs.SetResult(null); - } - }; - closedTask = closedTcs.Task; - - return connection; - } - - private class TestTransport : ITransport - { - private Channel _application; - private readonly Func _stopHandler; - - public TransferMode? Mode => TransferMode.Text; - - public TestTransport(Func stopHandler) - { - _stopHandler = stopHandler ?? new Func(() => Task.CompletedTask); - } - - public Task StartAsync(Uri url, Channel application, TransferMode requestedTransferMode, string connectionId, IConnection connection) - { - _application = application; - return Task.CompletedTask; - } - - public async Task StopAsync() - { - await _stopHandler(); - _application.Writer.TryComplete(); - } - } - - // Possibly useful as a general-purpose async testing helper? - private class SyncPoint - { - private TaskCompletionSource _atSyncPoint = new TaskCompletionSource(); - private TaskCompletionSource _continueFromSyncPoint = new TaskCompletionSource(); - - // Used by the test code to wait and continue - public Task WaitForSyncPoint() => _atSyncPoint.Task; - public void Continue() => _continueFromSyncPoint.TrySetResult(null); - - // Used by the code under test to wait for the test code to release it. - public Task WaitToContinue() - { - _atSyncPoint.TrySetResult(null); - return _continueFromSyncPoint.Task; - } - - public static Func Create(out SyncPoint syncPoint) - { - var handler = Create(1, out var syncPoints); - syncPoint = syncPoints[0]; - return handler; - } - - /// - /// Creates a re-entrant function that waits for sync points in sequence. - /// - /// The number of sync points to expect - /// The objects that can be used to coordinate the sync point - /// - public static Func Create(int count, out SyncPoint[] syncPoints) - { - // Need to use a local so the closure can capture it. You can't use out vars in a closure. - var localSyncPoints = new SyncPoint[count]; - for (var i = 0; i < count; i += 1) - { - localSyncPoints[i] = new SyncPoint(); - } - - syncPoints = localSyncPoints; - - var counter = 0; - return () => - { - if (counter >= localSyncPoints.Length) - { - return Task.CompletedTask; - } - else - { - var syncPoint = localSyncPoints[counter]; - - counter += 1; - return syncPoint.WaitToContinue(); - } - }; - } + }); } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.ConnectionLifecycle.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.ConnectionLifecycle.cs new file mode 100644 index 0000000000..a05c0eff67 --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.ConnectionLifecycle.cs @@ -0,0 +1,369 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Net; +using System.Net.Http; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Client.Tests; +using Microsoft.AspNetCore.Sockets.Client; +using Microsoft.Extensions.Logging.Testing; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.AspNetCore.SignalR.Client.Tests +{ + public partial class HttpConnectionTests + { + public class ConnectionLifecycle : LoggedTest + { + public ConnectionLifecycle(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task CannotStartRunningConnection() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync(CreateConnection(loggerFactory: loggerFactory), async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + var exception = + await Assert.ThrowsAsync( + async () => await connection.StartAsync().OrTimeout()); + Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); + }); + } + } + + + [Fact] + public async Task CannotStartConnectionDisposedAfterStarting() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync( + CreateConnection(loggerFactory: loggerFactory), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + await connection.DisposeAsync(); + var exception = + await Assert.ThrowsAsync( + async () => await connection.StartAsync().OrTimeout()); + + Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); + }); + } + } + + [Fact] + public async Task CannotStartDisposedConnection() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync( + CreateConnection(loggerFactory: loggerFactory), + async (connection, closed) => + { + await connection.DisposeAsync(); + var exception = + await Assert.ThrowsAsync( + async () => await connection.StartAsync().OrTimeout()); + + Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); + }); + } + } + + [Fact] + public async Task CanDisposeStartingConnection() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync( + CreateConnection( + loggerFactory: loggerFactory, + transport: new TestTransport( + onTransportStart: SyncPoint.Create(out var transportStart), + onTransportStop: SyncPoint.Create(out var transportStop))), + async (connection, closed) => + { + // Start the connection and wait for the transport to start up. + var startTask = connection.StartAsync(); + await transportStart.WaitForSyncPoint().OrTimeout(); + + // While the transport is starting, dispose the connection + var disposeTask = connection.DisposeAsync(); + transportStart.Continue(); // We need to release StartAsync, because Dispose waits for it. + + // Wait for start to finish, as that has to finish before the transport will be stopped. + await startTask.OrTimeout(); + + // Then release DisposeAsync (via the transport StopAsync call) + await transportStop.WaitForSyncPoint().OrTimeout(); + transportStop.Continue(); + }); + } + } + + [Fact] + public async Task CanStartConnectionThatFailedToStart() + { + using (StartLog(out var loggerFactory)) + { + var expected = new Exception("Transport failed to start"); + var shouldFail = true; + + Task OnTransportStart() + { + if (shouldFail) + { + // Succeed next time + shouldFail = false; + return Task.FromException(expected); + } + else + { + return Task.CompletedTask; + } + } + + await WithConnectionAsync( + CreateConnection( + loggerFactory: loggerFactory, + transport: new TestTransport(onTransportStart: OnTransportStart)), + async (connection, closed) => + { + var actual = await Assert.ThrowsAsync(() => connection.StartAsync()); + Assert.Same(expected, actual); + + // Should succeed this time + shouldFail = false; + + await connection.StartAsync().OrTimeout(); + }); + } + } + + [Fact] + public async Task CanStartStoppedConnection() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync( + CreateConnection(loggerFactory: loggerFactory), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + await connection.StopAsync().OrTimeout(); + await connection.StartAsync().OrTimeout(); + }); + } + } + + [Fact] + public async Task CanStopStartingConnection() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync( + CreateConnection( + loggerFactory: loggerFactory, + transport: new TestTransport(onTransportStart: SyncPoint.Create(out var transportStart))), + async (connection, closed) => + { + // Start and wait for the transport to start up. + var startTask = connection.StartAsync(); + await transportStart.WaitForSyncPoint().OrTimeout(); + + // Stop the connection while it's starting + var stopTask = connection.StopAsync(); + transportStart.Continue(); // We need to release Start in order for Stop to begin working. + + // Wait for start to finish, which will allow stop to finish and the connection to close. + await startTask.OrTimeout(); + await stopTask.OrTimeout(); + await closed.OrTimeout(); + }); + } + } + + [Fact] + public async Task StoppingStoppingConnectionNoOps() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync( + CreateConnection(loggerFactory: loggerFactory), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + await Task.WhenAll(connection.StopAsync(), connection.StopAsync()).OrTimeout(); + await closed.OrTimeout(); + }); + } + } + + [Fact] + public async Task CanStartConnectionAfterConnectionStoppedWithError() + { + using (StartLog(out var loggerFactory)) + { + var httpHandler = new TestHttpMessageHandler(); + + var longPollResult = new TaskCompletionSource(); + httpHandler.OnLongPoll(cancellationToken => longPollResult.Task.OrTimeout()); + + httpHandler.OnSocketSend((data, _) => + { + Assert.Collection(data, i => Assert.Equal(0x42, i)); + return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)); + }); + + await WithConnectionAsync( + CreateConnection(httpHandler, loggerFactory), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + await Assert.ThrowsAsync(() => connection.SendAsync(new byte[] { 0x42 }).OrTimeout()); + + longPollResult.TrySetResult(ResponseUtils.CreateResponse(HttpStatusCode.NoContent)); + + // Wait for the connection to close, because the send failed. + await Assert.ThrowsAsync(() => closed.OrTimeout()); + + // Start it up again + await connection.StartAsync().OrTimeout(); + }); + } + } + + [Fact] + public async Task DisposedStoppingConnectionDisposesConnection() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync( + CreateConnection( + loggerFactory: loggerFactory, + transport: new TestTransport(onTransportStop: SyncPoint.Create(out var transportStop))), + async (connection, closed) => + { + // Start the connection + await connection.StartAsync().OrTimeout(); + + // Stop the connection + var stopTask = connection.StopAsync().OrTimeout(); + + // Once the transport starts shutting down + await transportStop.WaitForSyncPoint(); + + // Start disposing and allow it to finish shutting down + var disposeTask = connection.DisposeAsync().OrTimeout(); + transportStop.Continue(); + + // Wait for the tasks to complete + await stopTask.OrTimeout(); + await closed.OrTimeout(); + await disposeTask.OrTimeout(); + + // We should be disposed and thus unable to restart. + var exception = await Assert.ThrowsAsync(() => connection.StartAsync().OrTimeout()); + Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); + }); + } + } + + [Fact] + public async Task CanDisposeStoppedConnection() + { + using (StartLog(out var loggerFactory)) + { + await WithConnectionAsync( + CreateConnection(loggerFactory: loggerFactory), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + await connection.StopAsync().OrTimeout(); + await closed.OrTimeout(); + await connection.DisposeAsync().OrTimeout(); + }); + } + } + + [Fact] + public Task ClosedEventRaisedWhenTheClientIsDisposed() + { + return WithConnectionAsync( + CreateConnection(), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + await connection.DisposeAsync().OrTimeout(); + await closed.OrTimeout(); + }); + } + + [Fact] + public async Task ConnectionClosedWhenTransportFails() + { + var testTransport = new TestTransport(); + + var expected = new Exception("Whoops!"); + + await WithConnectionAsync( + CreateConnection(transport: testTransport), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + testTransport.Application.Writer.TryComplete(expected); + var actual = await Assert.ThrowsAsync(() => closed.OrTimeout()); + Assert.Same(expected, actual); + + var sendException = await Assert.ThrowsAsync(() => connection.SendAsync(new byte[0]).OrTimeout()); + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", sendException.Message); + }); + } + + [Fact] + public Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted() + { + return WithConnectionAsync( + CreateConnection(), + async (connection, closed) => + { + await connection.DisposeAsync().OrTimeout(); + Assert.False(closed.IsCompleted); + }); + } + + [Fact] + public async Task TransportIsStoppedWhenConnectionIsStopped() + { + var testHttpHandler = new TestHttpMessageHandler(); + + // Just keep returning data when polled + testHttpHandler.OnLongPoll(_ => ResponseUtils.CreateResponse(HttpStatusCode.OK)); + + using (var httpClient = new HttpClient(testHttpHandler)) + { + var longPollingTransport = new LongPollingTransport(httpClient); + await WithConnectionAsync( + CreateConnection(transport: longPollingTransport), + async (connection, closed) => + { + // Start the transport + await connection.StartAsync().OrTimeout(); + Assert.False(longPollingTransport.Running.IsCompleted, "Expected that the transport would still be running"); + + // Stop the connection, and we should stop the transport + await connection.StopAsync().OrTimeout(); + await longPollingTransport.Running.OrTimeout(); + }); + } + } + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.Helpers.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.Helpers.cs new file mode 100644 index 0000000000..36d4ca0146 --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.Helpers.cs @@ -0,0 +1,131 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Net.Http; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Sockets; +using Microsoft.AspNetCore.Sockets.Client; +using Microsoft.AspNetCore.Sockets.Client.Http; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Microsoft.AspNetCore.SignalR.Client.Tests +{ + public partial class HttpConnectionTests + { + private static HttpConnection CreateConnection(HttpMessageHandler httpHandler = null, ILoggerFactory loggerFactory = null, string url = null, ITransport transport = null) + { + loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; + var httpOptions = new HttpOptions() + { + HttpMessageHandler = httpHandler ?? TestHttpMessageHandler.CreateDefault(), + }; + var uri = new Uri(url ?? "http://fakeuri.org/"); + + var connection = (transport != null) ? + new HttpConnection(uri, new TestTransportFactory(transport), loggerFactory, httpOptions) : + new HttpConnection(uri, TransportType.LongPolling, loggerFactory, httpOptions); + + return connection; + } + + private static async Task WithConnectionAsync(HttpConnection connection, Func body) + { + try + { + var closedTcs = new TaskCompletionSource(); + connection.Closed += ex => + { + if (ex != null) + { + closedTcs.SetException(ex); + } + else + { + closedTcs.SetResult(null); + } + }; + + // Using OrTimeout here will hide any timeout issues in the test :(. + await body(connection, closedTcs.Task); + } + finally + { + await connection.DisposeAsync().OrTimeout(); + } + } + + // Possibly useful as a general-purpose async testing helper? + private class SyncPoint + { + private TaskCompletionSource _atSyncPoint = new TaskCompletionSource(); + private TaskCompletionSource _continueFromSyncPoint = new TaskCompletionSource(); + + /// + /// Waits for the code-under-test to reach . + /// + /// + public Task WaitForSyncPoint() => _atSyncPoint.Task; + + /// + /// Releases the code-under-test to continue past where it waited for . + /// + public void Continue() => _continueFromSyncPoint.TrySetResult(null); + + /// + /// Used by the code-under-test to wait for the test code to sync up. + /// + /// + /// This code will unblock and then block waiting for to be called. + /// + /// + public Task WaitToContinue() + { + _atSyncPoint.TrySetResult(null); + return _continueFromSyncPoint.Task; + } + + public static Func Create(out SyncPoint syncPoint) + { + var handler = Create(1, out var syncPoints); + syncPoint = syncPoints[0]; + return handler; + } + + /// + /// Creates a re-entrant function that waits for sync points in sequence. + /// + /// The number of sync points to expect + /// The objects that can be used to coordinate the sync point + /// + public static Func Create(int count, out SyncPoint[] syncPoints) + { + // Need to use a local so the closure can capture it. You can't use out vars in a closure. + var localSyncPoints = new SyncPoint[count]; + for (var i = 0; i < count; i += 1) + { + localSyncPoints[i] = new SyncPoint(); + } + + syncPoints = localSyncPoints; + + var counter = 0; + return () => + { + if (counter >= localSyncPoints.Length) + { + return Task.CompletedTask; + } + else + { + var syncPoint = localSyncPoints[counter]; + + counter += 1; + return syncPoint.WaitToContinue(); + } + }; + } + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.Negotiate.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.Negotiate.cs new file mode 100644 index 0000000000..e1fecc023c --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.Negotiate.cs @@ -0,0 +1,94 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Net; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Client.Tests; +using Xunit; + +using TransportType = Microsoft.AspNetCore.Sockets.TransportType; + +namespace Microsoft.AspNetCore.SignalR.Client.Tests +{ + public partial class HttpConnectionTests + { + public class Negotiate + { + [Theory] + [InlineData("")] + [InlineData("Not Json")] + public Task StartThrowsFormatExceptionIfNegotiationResponseIsInvalid(string negotiatePayload) + { + return RunInvalidNegotiateResponseTest(negotiatePayload, "Invalid negotiation response received."); + } + + [Fact] + public Task StartThrowsFormatExceptionIfNegotiationResponseHasNoConnectionId() + { + return RunInvalidNegotiateResponseTest(ResponseUtils.CreateNegotiationContent(connectionId: null), "Invalid connection id returned in negotiation response."); + } + + [Fact] + public Task StartThrowsFormatExceptionIfNegotiationResponseHasNoTransports() + { + return RunInvalidNegotiateResponseTest(ResponseUtils.CreateNegotiationContent(transportTypes: null), "No transports returned in negotiation response."); + } + + [Theory] + [InlineData((TransportType)0)] + [InlineData(TransportType.ServerSentEvents)] + public Task ConnectionCannotBeStartedIfNoCommonTransportsBetweenClientAndServer(TransportType serverTransports) + { + return RunInvalidNegotiateResponseTest(ResponseUtils.CreateNegotiationContent(transportTypes: serverTransports), "No requested transports available on the server."); + } + + [Theory] + [InlineData("http://fakeuri.org/", "http://fakeuri.org/negotiate")] + [InlineData("http://fakeuri.org/?q=1/0", "http://fakeuri.org/negotiate?q=1/0")] + [InlineData("http://fakeuri.org?q=1/0", "http://fakeuri.org/negotiate?q=1/0")] + [InlineData("http://fakeuri.org/endpoint", "http://fakeuri.org/endpoint/negotiate")] + [InlineData("http://fakeuri.org/endpoint/", "http://fakeuri.org/endpoint/negotiate")] + [InlineData("http://fakeuri.org/endpoint?q=1/0", "http://fakeuri.org/endpoint/negotiate?q=1/0")] + public async Task CorrectlyHandlesQueryStringWhenAppendingNegotiateToUrl(string requestedUrl, string expectedNegotiate) + { + var testHttpHandler = new TestHttpMessageHandler(autoNegotiate: false); + + var negotiateUrlTcs = new TaskCompletionSource(); + testHttpHandler.OnLongPoll(cancellationToken => ResponseUtils.CreateResponse(HttpStatusCode.NoContent)); + testHttpHandler.OnNegotiate((request, cancellationToken) => + { + negotiateUrlTcs.TrySetResult(request.RequestUri.ToString()); + return ResponseUtils.CreateResponse(HttpStatusCode.OK, + ResponseUtils.CreateNegotiationContent()); + }); + + await WithConnectionAsync( + CreateConnection(testHttpHandler, url: requestedUrl), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + }); + + Assert.Equal(expectedNegotiate, await negotiateUrlTcs.Task.OrTimeout()); + } + + private async Task RunInvalidNegotiateResponseTest(string negotiatePayload, string expectedExceptionMessage) where TException : Exception + { + var testHttpHandler = new TestHttpMessageHandler(autoNegotiate: false); + + testHttpHandler.OnNegotiate((_, cancellationToken) => ResponseUtils.CreateResponse(HttpStatusCode.OK, negotiatePayload)); + + await WithConnectionAsync( + CreateConnection(testHttpHandler), + async (connection, closed) => + { + var exception = await Assert.ThrowsAsync( + () => connection.StartAsync().OrTimeout()); + + Assert.Equal(expectedExceptionMessage, exception.Message); + }); + } + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.OnReceived.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.OnReceived.cs new file mode 100644 index 0000000000..f922fb2716 --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.OnReceived.cs @@ -0,0 +1,107 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Net; +using System.Text; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Client.Tests; +using Xunit; + +namespace Microsoft.AspNetCore.SignalR.Client.Tests +{ + public partial class HttpConnectionTests + { + public class OnReceived + { + [Fact] + public async Task CanReceiveData() + { + var testHttpHandler = new TestHttpMessageHandler(); + + testHttpHandler.OnLongPoll(cancellationToken => ResponseUtils.CreateResponse(HttpStatusCode.OK, "42")); + testHttpHandler.OnSocketSend((_, __) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted)); + + await WithConnectionAsync( + CreateConnection(testHttpHandler), + async (connection, closed) => + { + var receiveTcs = new TaskCompletionSource(); + connection.OnReceived((data, state) => + { + var tcs = ((TaskCompletionSource)state); + tcs.TrySetResult(Encoding.UTF8.GetString(data)); + return Task.CompletedTask; + }, receiveTcs); + + await connection.StartAsync().OrTimeout(); + Assert.Equal("42", await receiveTcs.Task.OrTimeout()); + }); + } + + [Fact] + public async Task CanReceiveDataEvenIfExceptionThrownFromPreviousReceivedEvent() + { + var testHttpHandler = new TestHttpMessageHandler(); + + testHttpHandler.OnLongPoll(cancellationToken => ResponseUtils.CreateResponse(HttpStatusCode.OK, "42")); + testHttpHandler.OnSocketSend((_, __) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted)); + + await WithConnectionAsync( + CreateConnection(testHttpHandler), + async (connection, closed) => + { + var receiveTcs = new TaskCompletionSource(); + var receivedRaised = false; + connection.OnReceived((data, state) => + { + if (!receivedRaised) + { + receivedRaised = true; + return Task.FromException(new InvalidOperationException()); + } + + receiveTcs.TrySetResult(Encoding.UTF8.GetString(data)); + return Task.CompletedTask; + }, receiveTcs); + + await connection.StartAsync().OrTimeout(); + Assert.Equal("42", await receiveTcs.Task.OrTimeout()); + Assert.True(receivedRaised); + }); + } + + [Fact] + public async Task CanReceiveDataEvenIfExceptionThrownSynchronouslyFromPreviousReceivedEvent() + { + var testHttpHandler = new TestHttpMessageHandler(); + + testHttpHandler.OnLongPoll(cancellationToken => ResponseUtils.CreateResponse(HttpStatusCode.OK, "42")); + testHttpHandler.OnSocketSend((_, __) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted)); + + await WithConnectionAsync( + CreateConnection(testHttpHandler), + async (connection, closed) => + { + var receiveTcs = new TaskCompletionSource(); + var receivedRaised = false; + connection.OnReceived((data, state) => + { + if (!receivedRaised) + { + receivedRaised = true; + throw new InvalidOperationException(); + } + + receiveTcs.TrySetResult(Encoding.UTF8.GetString(data)); + return Task.CompletedTask; + }, receiveTcs); + + await connection.StartAsync().OrTimeout(); + Assert.Equal("42", await receiveTcs.Task.OrTimeout()); + Assert.True(receivedRaised); + }); + } + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.SendAsync.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.SendAsync.cs new file mode 100644 index 0000000000..beee62e7ed --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.SendAsync.cs @@ -0,0 +1,122 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Net; +using System.Net.Http; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Client.Tests; +using Xunit; + +namespace Microsoft.AspNetCore.SignalR.Client.Tests +{ + public partial class HttpConnectionTests + { + public class SendAsync + { + [Fact] + public async Task CanSendData() + { + var data = new byte[] { 1, 1, 2, 3, 5, 8 }; + + var testHttpHandler = new TestHttpMessageHandler(); + + var sendTcs = new TaskCompletionSource(); + var longPollTcs = new TaskCompletionSource(); + + testHttpHandler.OnLongPoll(cancellationToken => longPollTcs.Task); + + testHttpHandler.OnSocketSend((buf, cancellationToken) => + { + sendTcs.TrySetResult(buf); + return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.Accepted)); + }); + + await WithConnectionAsync( + CreateConnection(testHttpHandler), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + + await connection.SendAsync(data).OrTimeout(); + + Assert.Equal(data, await sendTcs.Task.OrTimeout()); + + longPollTcs.TrySetResult(ResponseUtils.CreateResponse(HttpStatusCode.NoContent)); + }); + } + + [Fact] + public Task SendThrowsIfConnectionIsNotStarted() + { + return WithConnectionAsync( + CreateConnection(), + async (connection, closed) => + { + var exception = await Assert.ThrowsAsync( + () => connection.SendAsync(new byte[0]).OrTimeout()); + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); + }); + } + + [Fact] + public Task SendThrowsIfConnectionIsStopped() + { + return WithConnectionAsync( + CreateConnection(), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + await connection.StopAsync().OrTimeout(); + + var exception = await Assert.ThrowsAsync( + () => connection.SendAsync(new byte[0]).OrTimeout()); + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); + }); + } + + [Fact] + public Task SendThrowsIfConnectionIsDisposed() + { + return WithConnectionAsync( + CreateConnection(), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + await connection.DisposeAsync().OrTimeout(); + + var exception = await Assert.ThrowsAsync( + () => connection.SendAsync(new byte[0]).OrTimeout()); + Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); + }); + } + + [Fact] + public async Task CallerReceivesExceptionsFromSendAsync() + { + var testHttpHandler = new TestHttpMessageHandler(); + + var longPollTcs = new TaskCompletionSource(); + + testHttpHandler.OnLongPoll(cancellationToken => longPollTcs.Task); + + testHttpHandler.OnSocketSend((buf, cancellationToken) => + { + return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError)); + }); + + await WithConnectionAsync( + CreateConnection(testHttpHandler), + async (connection, closed) => + { + await connection.StartAsync().OrTimeout(); + + var exception = await Assert.ThrowsAsync( + async () => await connection.SendAsync(new byte[0]).OrTimeout()); + + longPollTcs.TrySetResult(null); + }); + } + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs index f62e655b38..8f2b70df44 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs @@ -2,24 +2,20 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Diagnostics; -using System.Net; -using System.Net.Http; -using System.Text; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; -using Microsoft.AspNetCore.Client.Tests; -using Microsoft.AspNetCore.Sockets.Client.Http; +using Microsoft.AspNetCore.Sockets; +using Microsoft.AspNetCore.Sockets.Client; using Microsoft.AspNetCore.Sockets.Features; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Testing; -using Moq; -using Moq.Protected; using Xunit; using Xunit.Abstractions; -namespace Microsoft.AspNetCore.Sockets.Client.Tests +// This is needed because there's a System.Net.TransportType in net461 (it's internal in netcoreapp). +using TransportType = Microsoft.AspNetCore.Sockets.TransportType; + +namespace Microsoft.AspNetCore.SignalR.Client.Tests { public partial class HttpConnectionTests : LoggedTest { @@ -51,504 +47,46 @@ public void CannotStartConnectionWithInvalidTransportType(TransportType requeste } [Fact] - public async Task CannotStartRunningConnection() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - try - { - await connection.StartAsync(); - var exception = - await Assert.ThrowsAsync( - async () => await connection.StartAsync()); - Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); - } - finally - { - await connection.DisposeAsync(); - } - } - - [Fact] - public async Task CannotStartConnectionDisposedAfterStarting() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - await connection.StartAsync(); - await connection.DisposeAsync(); - var exception = - await Assert.ThrowsAsync( - async () => await connection.StartAsync()); - - Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); - } - - [Fact] - public async Task CannotStartDisposedConnection() - { - using (var httpClient = new HttpClient()) - { - var connection = new HttpConnection(new Uri("http://fakeuri.org/")); - await connection.DisposeAsync(); - var exception = - await Assert.ThrowsAsync( - async () => await connection.StartAsync()); - - Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); - } - } - - [Fact] - public async Task CanDisposeStartingConnection() - { - // Used to make sure StartAsync is not completed before DisposeAsync is called - var releaseNegotiateTcs = new TaskCompletionSource(); - // Used to make sure that DisposeAsync runs after we check the state in StartAsync - var allowDisposeTcs = new TaskCompletionSource(); - // Used to make sure that DisposeAsync continues only after StartAsync finished - var releaseDisposeTcs = new TaskCompletionSource(); - - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - // allow DisposeAsync to continue once we know we are past the connection state check - allowDisposeTcs.SetResult(null); - await releaseNegotiateTcs.Task; - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var transport = new Mock(); - Channel channel = null; - transport.SetupGet(t => t.Mode).Returns(TransferMode.Text); - transport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns, TransferMode, string, IConnection>((_, c, __, ___, ____) => - { - channel = c; - return Task.CompletedTask; - }); - transport.Setup(t => t.StopAsync()).Returns(async () => - { - await releaseDisposeTcs.Task; - channel.Writer.TryComplete(); - }); - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(transport.Object), loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - var startTask = connection.StartAsync(); - await allowDisposeTcs.Task; - var disposeTask = connection.DisposeAsync(); - // allow StartAsync to continue once DisposeAsync has started - releaseNegotiateTcs.SetResult(null); - - // unblock DisposeAsync only after StartAsync completed - await startTask.OrTimeout(); - releaseDisposeTcs.SetResult(null); - await disposeTask.OrTimeout(); - } - - [Fact] - public async Task CanStartConnectionThatFailedToStart() - { - var failNegotiate = true; - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - if (ResponseUtils.IsNegotiateRequest(request)) - { - return failNegotiate - ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) - : ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); - } - - return ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - try - { - await connection.StartAsync().OrTimeout(); - } - catch { } - failNegotiate = false; - await connection.StartAsync().OrTimeout(); - await connection.DisposeAsync().OrTimeout(); - } - - [Fact] - public async Task CanStartStoppedConnection() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - await connection.StartAsync().OrTimeout(); - await connection.StopAsync().OrTimeout(); - await connection.StartAsync().OrTimeout(); - await connection.DisposeAsync().OrTimeout(); - } - - [Fact] - public async Task CanStopStartingConnection() + public async Task EventsAreNotRunningOnMainLoop() { - var allowStopTcs = new TaskCompletionSource(); - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - if (ResponseUtils.IsNegotiateRequest(request)) - { - allowStopTcs.SetResult(null); - return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); - } - else - { - var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null; - return (content?.Length == 1 && content[0] == 0x42) - ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - } - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); + var testTransport = new TestTransport(); - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => closeTcs.TrySetResult(null); - - var startTask = connection.StartAsync(); - await allowStopTcs.Task.OrTimeout(); - - await Task.WhenAll(startTask, connection.StopAsync()).OrTimeout(); - await closeTcs.Task.OrTimeout(); - } - - [Fact] - public async Task CanStartConnectionAfterConnectionStoppedWithError() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => + await WithConnectionAsync( + CreateConnection(transport: testTransport), + async (connection, closed) => { - await Task.Yield(); - if (ResponseUtils.IsNegotiateRequest(request)) - { - return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); - } - - var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null; - return (content?.Length == 1 && content[0] == 0x42) - ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => closeTcs.TrySetResult(null); - - await connection.StartAsync().OrTimeout(); - try - { - await connection.SendAsync(new byte[] { 0x42 }).OrTimeout(); - } - catch { } - await closeTcs.Task.OrTimeout(); - await connection.StartAsync().OrTimeout(); - await connection.DisposeAsync().OrTimeout(); - } - - [Fact] - public async Task CanDisposeStoppedConnection() - { - var connection = new HttpConnection(new Uri("http://fakeuri.org/")); - await connection.StopAsync().OrTimeout(); - await connection.DisposeAsync().OrTimeout(); - } + // Block up the OnReceived callback until we finish the test. + var onReceived = new SyncPoint(); + connection.OnReceived(_ => onReceived.WaitToContinue().OrTimeout()); - [Fact] - public async Task StoppingStoppingConnectionNoOps() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - if (ResponseUtils.IsNegotiateRequest(request)) - { - return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); - } - else - { - var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null; - return (content?.Length == 1 && content[0] == 0x42) - ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - } - }); + await connection.StartAsync().OrTimeout(); - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); + // This will trigger the received callback + testTransport.Application.Writer.TryWrite(Array.Empty()); - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => closeTcs.TrySetResult(null); + // Wait to hit the sync point. We are now blocking up the TaskQueue + await onReceived.WaitForSyncPoint().OrTimeout(); - await connection.StartAsync().OrTimeout(); - await Task.WhenAll(connection.StopAsync().OrTimeout(), connection.StopAsync().OrTimeout()); - await closeTcs.Task.OrTimeout(); - } + // Now we write something else and we want to test that the HttpConnection receive loop is still + // removing items from the channel even though OnReceived is blocked up. + testTransport.Application.Writer.TryWrite(Array.Empty()); - [Fact] - public async Task DisposedStoppingConnectionDisposesConnection() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - if (ResponseUtils.IsNegotiateRequest(request)) + // Now that we've written, we wait for WaitToReadAsync to return an INCOMPLETE task. It will do so + // once HttpConnection reads the message. We also use a CTS to timeout in case the loop is indeed blocked + var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(5)); + while (testTransport.Application.Reader.WaitToReadAsync().IsCompleted && !cts.IsCancellationRequested) { - return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); - } - else - { - var content = request.Content != null ? await request.Content.ReadAsByteArrayAsync() : null; - return (content?.Length == 1 && content[0] == 0x42) - ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); + // Yield to allow the HttpConnection to dequeue the message + await Task.Yield(); } - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => closeTcs.TrySetResult(null); - - await connection.StartAsync().OrTimeout(); - await Task.WhenAll(connection.StopAsync().OrTimeout(), connection.DisposeAsync().OrTimeout()); - await closeTcs.Task.OrTimeout(); - - var exception = await Assert.ThrowsAsync(() => connection.StartAsync()); - Assert.Equal("Cannot start a connection that is not in the Disconnected state.", exception.Message); - } - - [Fact] - public async Task SendThrowsIfConnectionIsNotStarted() - { - var connection = new HttpConnection(new Uri("http://fakeuri.org/")); - var exception = await Assert.ThrowsAsync( - async () => await connection.SendAsync(new byte[0])); - Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); - } - - [Fact] - public async Task SendThrowsIfConnectionIsDisposed() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - await connection.StartAsync(); - await connection.DisposeAsync(); - - var exception = await Assert.ThrowsAsync( - async () => await connection.SendAsync(new byte[0])); - Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); - } - - [Fact] - public async Task ClosedEventRaisedWhenTheClientIsBeingStopped() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => - { - if (e != null) - { - closeTcs.SetException(e); - } - else - { - closeTcs.SetResult(null); - } - }; - await connection.StartAsync().OrTimeout(); - await connection.DisposeAsync().OrTimeout(); - await closeTcs.Task.OrTimeout(); - } - - [Fact] - public async Task ClosedEventRaisedWhenConnectionToServerLost() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - return request.Method == HttpMethod.Get - ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) - : ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => - { - if (e != null) - { - closeTcs.SetException(e); - } - else - { - closeTcs.SetResult(null); - } - }; - - try - { - await connection.StartAsync().OrTimeout(); - await Assert.ThrowsAsync(() => closeTcs.Task.OrTimeout()); - } - finally - { - await connection.DisposeAsync(); - } - } - - [Fact] - public async Task EventsAreNotRunningOnMainLoop() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - var mockTransport = new Mock(); - Channel channel = null; - mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) => - { - channel = c; - return Task.CompletedTask; - }); - mockTransport.Setup(t => t.StopAsync()) - .Returns(() => - { - channel.Writer.TryComplete(); - return Task.CompletedTask; - }); - mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text); + // If we exited because we were cancelled, throw. + cts.Token.ThrowIfCancellationRequested(); - var callbackInvokedTcs = new TaskCompletionSource(); - var closedTcs = new TaskCompletionSource(); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - connection.OnReceived(_ => - { - callbackInvokedTcs.SetResult(null); - return closedTcs.Task; + // We're free! Unblock onreceived + onReceived.Continue(); }); - - await connection.StartAsync(); - channel.Writer.TryWrite(Array.Empty()); - - // Ensure that the Received callback has been called before attempting the second write - await callbackInvokedTcs.Task.OrTimeout(); - channel.Writer.TryWrite(Array.Empty()); - - // Ensure that SignalR isn't blocked by the receive callback - Assert.False(channel.Reader.TryRead(out var message)); - - closedTcs.SetResult(null); - - await connection.DisposeAsync(); } [Fact] @@ -558,647 +96,48 @@ public async Task EventQueueTimeout() { var logger = loggerFactory.CreateLogger(); - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var mockTransport = new Mock(); - Channel channel = null; - mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) => - { - logger.LogInformation("Transport started"); - channel = c; - return Task.CompletedTask; - }); - mockTransport.Setup(t => t.StopAsync()) - .Returns(() => - { - logger.LogInformation("Transport stopped"); - channel.Writer.TryComplete(); - return Task.CompletedTask; - }); - mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text); - - var blockReceiveCallbackTcs = new TaskCompletionSource(); - var onReceivedCalledTcs = new TaskCompletionSource(); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - connection.OnReceived(async _ => - { - onReceivedCalledTcs.TrySetResult(null); - await blockReceiveCallbackTcs.Task; - }); - - logger.LogInformation("Starting connection"); - await connection.StartAsync().OrTimeout(); - logger.LogInformation("Started connection"); - channel.Writer.TryWrite(Array.Empty()); - await onReceivedCalledTcs.Task.OrTimeout(); - - // Ensure that SignalR isn't blocked by the receive callback - Assert.False(channel.Reader.TryRead(out var message)); - - logger.LogInformation("Disposing connection"); - await connection.DisposeAsync().OrTimeout(TimeSpan.FromSeconds(10)); - logger.LogInformation("Disposed connection"); - } - } - - [Fact] - public async Task EventQueueTimeoutWithException() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var mockTransport = new Mock(); - Channel channel = null; - mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) => - { - channel = c; - return Task.CompletedTask; - }); - mockTransport.Setup(t => t.StopAsync()) - .Returns(() => - { - channel.Writer.TryComplete(); - return Task.CompletedTask; - }); - mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Text); - - var callbackInvokedTcs = new TaskCompletionSource(); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - connection.OnReceived(_ => - { - throw new OperationCanceledException(); - }); - - await connection.StartAsync(); - channel.Writer.TryWrite(Array.Empty()); - - // Ensure that SignalR isn't blocked by the receive callback - Assert.False(channel.Reader.TryRead(out var message)); - - await connection.DisposeAsync(); - } - - [Fact] - public async Task ClosedEventNotRaisedWhenTheClientIsStoppedButWasNeverStarted() - { - var connection = new HttpConnection(new Uri("http://fakeuri.org/")); - var closeInvoked = false; - connection.Closed += e => closeInvoked = true; - await connection.DisposeAsync(); - Assert.False(closeInvoked); - } - - [Fact] - public async Task TransportIsStoppedWhenConnectionIsStopped() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - using (var httpClient = new HttpClient(mockHttpHandler.Object)) - { - var longPollingTransport = new LongPollingTransport(httpClient, null, new LoggerFactory()); - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(longPollingTransport), loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - try - { - await connection.StartAsync(); - - Assert.False(longPollingTransport.Running.IsCompleted); - } - finally - { - await connection.DisposeAsync(); - } - - await longPollingTransport.Running.OrTimeout(); - } - } - - [Fact] - public async Task CanSendData() - { - var data = new byte[] { 1, 1, 2, 3, 5, 8 }; - - var sendTcs = new TaskCompletionSource(); - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - if (ResponseUtils.IsNegotiateRequest(request)) - { - return ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()); - } - - if (request.Method == HttpMethod.Post) - { - sendTcs.SetResult(await request.Content.ReadAsByteArrayAsync()); - } - - return ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - try - { - await connection.StartAsync(); - - await connection.SendAsync(data); - - Assert.Equal(data, await sendTcs.Task.OrTimeout()); - } - finally - { - await connection.DisposeAsync(); - } - } - - [Fact] - public async Task SendAsyncThrowsIfConnectionIsNotStarted() - { - var connection = new HttpConnection(new Uri("http://fakeuri.org/")); - var exception = await Assert.ThrowsAsync( - async () => await connection.SendAsync(new byte[0])); - - Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); - } - - [Fact] - public async Task SendAsyncThrowsIfConnectionIsDisposed() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - var content = string.Empty; - if (request.Method == HttpMethod.Get) - { - content = "T2:T:42;"; - } - - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK, content); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - await connection.StartAsync(); - await connection.DisposeAsync(); - - var exception = await Assert.ThrowsAsync( - async () => await connection.SendAsync(new byte[0])); - - Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); - } - - [Fact] - public async Task CallerReceivesExceptionsFromSendAsync() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : request.Method == HttpMethod.Post - ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - await connection.StartAsync(); - - var exception = await Assert.ThrowsAsync( - async () => await connection.SendAsync(new byte[0])); - - await connection.DisposeAsync(); - } - - [Fact] - public async Task CanReceiveData() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - var content = string.Empty; - - if (request.Method == HttpMethod.Get) - { - content = "42"; - } - - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK, content); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - try - { - var receiveTcs = new TaskCompletionSource(); - connection.OnReceived((data, state) => - { - var tcs = ((TaskCompletionSource)state); - tcs.TrySetResult(Encoding.UTF8.GetString(data)); - return Task.CompletedTask; - }, receiveTcs); - - connection.Closed += e => - { - if (e != null) - { - receiveTcs.TrySetException(e); - } - else - { - receiveTcs.TrySetCanceled(); - } - }; - - await connection.StartAsync().OrTimeout(); - Assert.Equal("42", await receiveTcs.Task.OrTimeout()); - } - finally - { - await connection.DisposeAsync(); - } - } - - [Fact] - public async Task CanReceiveDataEvenIfExceptionThrownFromPreviousReceivedEvent() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - var content = string.Empty; - - if (request.Method == HttpMethod.Get) - { - content = "42"; - } - - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK, content); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - try - { - var receiveTcs = new TaskCompletionSource(); - - var receivedRaised = false; - connection.OnReceived(data => - { - if (!receivedRaised) - { - receivedRaised = true; - return Task.FromException(new InvalidOperationException()); - } - - receiveTcs.TrySetResult(Encoding.UTF8.GetString(data)); - return Task.CompletedTask; - }); - - connection.Closed += e => - { - if (e != null) - { - receiveTcs.TrySetException(e); - } - else - { - receiveTcs.TrySetCanceled(); - } - }; - - await connection.StartAsync(); - - Assert.Equal("42", await receiveTcs.Task.OrTimeout()); - } - finally - { - await connection.DisposeAsync(); - } - } - - [Fact] - public async Task CanReceiveDataEvenIfExceptionThrownSynchronouslyFromPreviousReceivedEvent() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - - var content = string.Empty; - - if (request.Method == HttpMethod.Get) - { - content = "42"; - } - - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK, content); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - try - { - var receiveTcs = new TaskCompletionSource(); + var testTransport = new TestTransport(); - var receivedRaised = false; - connection.OnReceived((data) => - { - if (!receivedRaised) + await WithConnectionAsync( + CreateConnection(transport: testTransport), + async (connection, closed) => { - receivedRaised = true; - throw new InvalidOperationException(); - } - - receiveTcs.TrySetResult(Encoding.UTF8.GetString(data)); - return Task.CompletedTask; - }); - - connection.Closed += e => - { - if (e != null) - { - receiveTcs.TrySetException(e); - } - else - { - receiveTcs.TrySetCanceled(); - } - }; - - await connection.StartAsync(); - - Assert.Equal("42", await receiveTcs.Task.OrTimeout()); - } - finally - { - await connection.DisposeAsync(); - } - } + var onReceived = new SyncPoint(); + connection.OnReceived(_ => onReceived.WaitToContinue().OrTimeout()); - [Fact] - public async Task CannotSendAfterReceiveThrewException() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); + logger.LogInformation("Starting connection"); + await connection.StartAsync().OrTimeout(); + logger.LogInformation("Started connection"); - return request.Method == HttpMethod.Get - ? ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError) - : ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - try - { - var closeTcs = new TaskCompletionSource(); - connection.Closed += e => - { - if (e != null) - { - closeTcs.SetException(e); - } - else - { - closeTcs.SetResult(null); - } - }; + testTransport.Application.Writer.TryWrite(Array.Empty()); + await onReceived.WaitForSyncPoint().OrTimeout(); - await connection.StartAsync().OrTimeout(); - await Assert.ThrowsAsync(() => closeTcs.Task.OrTimeout()); - var exception = await Assert.ThrowsAsync(() => connection.SendAsync(new byte[0])); + // Dispose should complete, even though the receive callbacks are completely blocked up. + logger.LogInformation("Disposing connection"); + await connection.DisposeAsync().OrTimeout(TimeSpan.FromSeconds(10)); + logger.LogInformation("Disposed connection"); - Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); - } - finally - { - await connection.DisposeAsync(); + // Clear up blocked tasks. + onReceived.Continue(); + }); } } - [Theory] - [InlineData("")] - [InlineData("Not Json")] - public async Task StartThrowsFormatExceptionIfNegotiationResponseIsInvalid(string negotiatePayload) - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.CreateResponse(HttpStatusCode.OK, negotiatePayload); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - var exception = await Assert.ThrowsAsync( - () => connection.StartAsync()); - - Assert.Equal("Invalid negotiation response received.", exception.Message); - } - - [Fact] - public async Task StartThrowsFormatExceptionIfNegotiationResponseHasNoConnectionId() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.CreateResponse(HttpStatusCode.OK, - ResponseUtils.CreateNegotiationResponse(connectionId: null)); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - var exception = await Assert.ThrowsAsync( - () => connection.StartAsync()); - - Assert.Equal("Invalid connection id returned in negotiation response.", exception.Message); - } - - [Fact] - public async Task StartThrowsFormatExceptionIfNegotiationResponseHasNoTransports() - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.CreateResponse(HttpStatusCode.OK, - ResponseUtils.CreateNegotiationResponse(transportTypes: null)); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - var exception = await Assert.ThrowsAsync( - () => connection.StartAsync()); - - Assert.Equal("No transports returned in negotiation response.", exception.Message); - } - - [Theory] - [InlineData((TransportType)0)] - [InlineData(TransportType.ServerSentEvents)] - public async Task ConnectionCannotBeStartedIfNoCommonTransportsBetweenClientAndServer(TransportType serverTransports) - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.CreateResponse(HttpStatusCode.OK, - ResponseUtils.CreateNegotiationResponse(transportTypes: serverTransports)); - }); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - var exception = await Assert.ThrowsAsync( - () => connection.StartAsync()); - - Assert.Equal("No requested transports available on the server.", exception.Message); - } - [Fact] - public async Task CanStartConnectionWithoutSettingTransferModeFeature() + public async Task StartAsyncSetsTransferModeFeature() { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - return ResponseUtils.IsNegotiateRequest(request) - ? ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationResponse()) - : ResponseUtils.CreateResponse(HttpStatusCode.OK); - }); - - var mockTransport = new Mock(); - Channel channel = null; - mockTransport.Setup(t => t.StartAsync(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny(), It.IsAny())) - .Returns, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) => - { - channel = c; - return Task.CompletedTask; - }); - mockTransport.Setup(t => t.StopAsync()) - .Returns(() => + var testTransport = new TestTransport(transferMode: TransferMode.Binary); + await WithConnectionAsync( + CreateConnection(transport: testTransport), + async (connection, closed) => { - channel.Writer.TryComplete(); - return Task.CompletedTask; - }); - mockTransport.SetupGet(t => t.Mode).Returns(TransferMode.Binary); - - var connection = new HttpConnection(new Uri("http://fakeuri.org/"), new TestTransportFactory(mockTransport.Object), - loggerFactory: null, httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - - await connection.StartAsync().OrTimeout(); - var transferModeFeature = connection.Features.Get(); - await connection.DisposeAsync().OrTimeout(); + Assert.Null(connection.Features.Get()); + await connection.StartAsync().OrTimeout(); - mockTransport.Verify(t => t.StartAsync( - It.IsAny(), It.IsAny>(), TransferMode.Text, It.IsAny(), It.IsAny()), Times.Once); - Assert.NotNull(transferModeFeature); - Assert.Equal(TransferMode.Binary, transferModeFeature.TransferMode); - } - - [Theory] - [InlineData("http://fakeuri.org/", "http://fakeuri.org/negotiate")] - [InlineData("http://fakeuri.org/?q=1/0", "http://fakeuri.org/negotiate?q=1/0")] - [InlineData("http://fakeuri.org?q=1/0", "http://fakeuri.org/negotiate?q=1/0")] - [InlineData("http://fakeuri.org/endpoint", "http://fakeuri.org/endpoint/negotiate")] - [InlineData("http://fakeuri.org/endpoint/", "http://fakeuri.org/endpoint/negotiate")] - [InlineData("http://fakeuri.org/endpoint?q=1/0", "http://fakeuri.org/endpoint/negotiate?q=1/0")] - public async Task CorrectlyHandlesQueryStringWhenAppendingNegotiateToUrl(string requested, string expectedNegotiate) - { - var mockHttpHandler = new Mock(); - mockHttpHandler.Protected() - .Setup>("SendAsync", ItExpr.IsAny(), ItExpr.IsAny()) - .Returns(async (request, cancellationToken) => - { - await Task.Yield(); - Assert.Equal(expectedNegotiate, request.RequestUri.ToString()); - return ResponseUtils.CreateResponse(HttpStatusCode.OK, - ResponseUtils.CreateNegotiationResponse()); + var transferModeFeature = connection.Features.Get(); + Assert.NotNull(transferModeFeature); + Assert.Equal(TransferMode.Binary, transferModeFeature.TransferMode); }); - - var connection = new HttpConnection(new Uri(requested), TransportType.LongPolling, loggerFactory: null, - httpOptions: new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); - await connection.StartAsync().OrTimeout(); - await connection.DisposeAsync().OrTimeout(); } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ResponseUtils.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ResponseUtils.cs index 0db15aa9af..32b457173d 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/ResponseUtils.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/ResponseUtils.cs @@ -35,7 +35,7 @@ public static bool IsNegotiateRequest(HttpRequestMessage request) new UriBuilder(request.RequestUri).Path.EndsWith("/negotiate"); } - public static string CreateNegotiationResponse(string connectionId = "00000000-0000-0000-0000-000000000000", + public static string CreateNegotiationContent(string connectionId = "00000000-0000-0000-0000-000000000000", SocketsTransportType? transportTypes = SocketsTransportType.All) { var sb = new StringBuilder("{ "); diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestHttpMessageHandler.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestHttpMessageHandler.cs index c12f1c05aa..348980bfa1 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestHttpMessageHandler.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestHttpMessageHandler.cs @@ -9,17 +9,128 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests { public class TestHttpMessageHandler : HttpMessageHandler { - protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + private Func> _handler; + + public TestHttpMessageHandler(bool autoNegotiate = true) { - if (ResponseUtils.IsNegotiateRequest(request)) + _handler = (request, cancellationToken) => BaseHandler(request, cancellationToken); + + if (autoNegotiate) { - return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.OK, - ResponseUtils.CreateNegotiationResponse())); + OnNegotiate((_, cancellationToken) => ResponseUtils.CreateResponse(HttpStatusCode.OK, ResponseUtils.CreateNegotiationContent())); } - else + } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + await Task.Yield(); + + return await _handler(request, cancellationToken); + } + + public static HttpMessageHandler CreateDefault() + { + var testHttpMessageHandler = new TestHttpMessageHandler(); + + testHttpMessageHandler.OnSocketSend((_, __) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted)); + testHttpMessageHandler.OnLongPoll(async cancellationToken => { - return Task.FromException(new InvalidOperationException($"Http endpoint not implemented: {request.RequestUri}")); - } + // Just block until canceled + var tcs = new TaskCompletionSource(); + using (cancellationToken.Register(() => tcs.TrySetResult(null))) + { + await tcs.Task; + } + return ResponseUtils.CreateResponse(HttpStatusCode.NoContent); + }); + + return testHttpMessageHandler; + } + + public void OnRequest(Func>, CancellationToken, Task> handler) + { + var nextHandler = _handler; + _handler = (request, cancellationToken) => handler(request, () => nextHandler(request, cancellationToken), cancellationToken); + } + + public void OnGet(string pathAndQuery, Func> handler) => OnRequest(HttpMethod.Get, pathAndQuery, handler); + public void OnPost(string pathAndQuery, Func> handler) => OnRequest(HttpMethod.Post, pathAndQuery, handler); + public void OnPut(string pathAndQuery, Func> handler) => OnRequest(HttpMethod.Put, pathAndQuery, handler); + public void OnDelete(string pathAndQuery, Func> handler) => OnRequest(HttpMethod.Delete, pathAndQuery, handler); + public void OnHead(string pathAndQuery, Func> handler) => OnRequest(HttpMethod.Head, pathAndQuery, handler); + public void OnOptions(string pathAndQuery, Func> handler) => OnRequest(HttpMethod.Options, pathAndQuery, handler); + public void OnTrace(string pathAndQuery, Func> handler) => OnRequest(HttpMethod.Trace, pathAndQuery, handler); + + public void OnRequest(HttpMethod method, string pathAndQuery, Func> handler) + { + OnRequest((request, next, cancellationToken) => + { + if (request.Method.Equals(method) && string.Equals(request.RequestUri.PathAndQuery, pathAndQuery)) + { + return handler(request, cancellationToken); + } + else + { + return next(); + } + }); + } + + public void OnNegotiate(Func handler) => OnNegotiate((req, cancellationToken) => Task.FromResult(handler(req, cancellationToken))); + + public void OnNegotiate(Func> handler) + { + OnRequest((request, next, cancellationToken) => + { + if (ResponseUtils.IsNegotiateRequest(request)) + { + return handler(request, cancellationToken); + } + else + { + return next(); + } + }); + } + + public void OnLongPoll(Func handler) => OnLongPoll(cancellationToken => Task.FromResult(handler(cancellationToken))); + + public void OnLongPoll(Func> handler) + { + OnRequest((request, next, cancellationToken) => + { + if (request.Method.Equals(HttpMethod.Get) && request.RequestUri.PathAndQuery.StartsWith("/?id=")) + { + return handler(cancellationToken); + } + else + { + return next(); + } + }); + } + + public void OnSocketSend(Func handler) => OnSocketSend((data, cancellationToken) => Task.FromResult(handler(data, cancellationToken))); + + public void OnSocketSend(Func> handler) + { + OnRequest(async (request, next, cancellationToken) => + { + if (request.Method.Equals(HttpMethod.Post) && request.RequestUri.PathAndQuery.StartsWith("/?id=")) + { + var data = await request.Content.ReadAsByteArrayAsync(); + return await handler(data, cancellationToken); + } + else + { + return await next(); + } + }); + } + + private Task BaseHandler(HttpRequestMessage request, CancellationToken cancellationToken) + { + return Task.FromException(new InvalidOperationException($"Http endpoint not implemented: {request.Method} {request.RequestUri}")); } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransport.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransport.cs new file mode 100644 index 0000000000..b32808fc8b --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransport.cs @@ -0,0 +1,36 @@ +using System; +using System.Threading.Channels; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Sockets; +using Microsoft.AspNetCore.Sockets.Client; + +namespace Microsoft.AspNetCore.SignalR.Client.Tests +{ + public class TestTransport : ITransport + { + private readonly Func _stopHandler; + private readonly Func _startHandler; + + public TransferMode? Mode { get; } + public Channel Application { get; private set; } + + public TestTransport(Func onTransportStop = null, Func onTransportStart = null, TransferMode transferMode = TransferMode.Text) + { + _stopHandler = onTransportStop ?? new Func(() => Task.CompletedTask); + _startHandler = onTransportStart ?? new Func(() => Task.CompletedTask); + Mode = transferMode; + } + + public Task StartAsync(Uri url, Channel application, TransferMode requestedTransferMode, string connectionId, IConnection connection) + { + Application = application; + return _startHandler(); + } + + public async Task StopAsync() + { + await _stopHandler(); + Application.Writer.TryComplete(); + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransportFactory.cs b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransportFactory.cs index 258856bc23..82b7fc1d31 100644 --- a/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransportFactory.cs +++ b/test/Microsoft.AspNetCore.SignalR.Client.Tests/TestTransportFactory.cs @@ -1,10 +1,12 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System.Net.Http; +using Microsoft.AspNetCore.Sockets; +using Microsoft.AspNetCore.Sockets.Client; using Microsoft.Extensions.Logging; -namespace Microsoft.AspNetCore.Sockets.Client.Tests +namespace Microsoft.AspNetCore.SignalR.Client.Tests { public class TestTransportFactory : ITransportFactory { @@ -20,4 +22,4 @@ public ITransport CreateTransport(TransportType availableServerTransports) return _transport; } } -} \ No newline at end of file +} diff --git a/test/Microsoft.AspNetCore.SignalR.Tests.Utils/ServerFixture.cs b/test/Microsoft.AspNetCore.SignalR.Tests.Utils/ServerFixture.cs index 1151bfb3b1..0a8f6bc63a 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests.Utils/ServerFixture.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests.Utils/ServerFixture.cs @@ -5,10 +5,12 @@ using System.IO; using System.Net; using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Testing; namespace Microsoft.AspNetCore.SignalR.Tests @@ -21,6 +23,7 @@ public class ServerFixture : IDisposable private IWebHost _host; private IApplicationLifetime _lifetime; private readonly IDisposable _logToken; + private AsyncForwardingLoggerProvider _asyncLoggerProvider; public string WebSocketsUrl => Url.Replace("http", "ws"); @@ -28,14 +31,22 @@ public class ServerFixture : IDisposable public ServerFixture() { + _asyncLoggerProvider = new AsyncForwardingLoggerProvider(); + var testLog = AssemblyTestLog.ForAssembly(typeof(TStartup).Assembly); _logToken = testLog.StartTestLog(null, $"{nameof(ServerFixture)}_{typeof(TStartup).Name}", out _loggerFactory, "ServerFixture"); + _loggerFactory.AddProvider(_asyncLoggerProvider); _logger = _loggerFactory.CreateLogger>(); Url = "http://localhost:" + GetNextPort(); StartServer(Url); } + public void SetTestLoggerFactory(ILoggerFactory loggerFactory) + { + _asyncLoggerProvider.SetLoggerFactory(loggerFactory); + } + private void StartServer(string url) { _host = new WebHostBuilder() @@ -74,6 +85,62 @@ public void Dispose() _loggerFactory.Dispose(); } + private class AsyncForwardingLoggerProvider : ILoggerProvider + { + private AsyncLocal _localLogger = new AsyncLocal(); + + public ILogger CreateLogger(string categoryName) + { + return new AsyncLocalForwardingLogger(categoryName, _localLogger); + } + + public void Dispose() + { + } + + public void SetLoggerFactory(ILoggerFactory loggerFactory) + { + _localLogger.Value = loggerFactory; + } + + private class AsyncLocalForwardingLogger : ILogger + { + private string _categoryName; + private AsyncLocal _localLoggerFactory; + + public AsyncLocalForwardingLogger(string categoryName, AsyncLocal localLoggerFactory) + { + _categoryName = categoryName; + _localLoggerFactory = localLoggerFactory; + } + + public IDisposable BeginScope(TState state) + { + return GetLocalLogger().BeginScope(state); + } + + public bool IsEnabled(LogLevel logLevel) + { + return GetLocalLogger().IsEnabled(logLevel); + } + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + GetLocalLogger().Log(logLevel, eventId, state, exception, formatter); + } + + private ILogger GetLocalLogger() + { + var factory = _localLoggerFactory.Value; + if (factory == null) + { + return NullLogger.Instance; + } + return factory.CreateLogger(_categoryName); + } + } + } + private class ForwardingLoggerProvider : ILoggerProvider { private readonly ILoggerFactory _loggerFactory; diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs index 5e50df59ea..c487e98645 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/EndToEndTests.cs @@ -114,7 +114,7 @@ public async Task HTTPRequestsNotSentWhenWebSocketsTransportRequested() .Returns( (request, cancellationToken) => Task.FromException(new InvalidOperationException("HTTP requests should not be sent."))); - var connection = new HttpConnection(new Uri(url), TransportType.WebSockets, loggerFactory, new HttpOptions { HttpMessageHandler = mockHttpHandler.Object}); + var connection = new HttpConnection(new Uri(url), TransportType.WebSockets, loggerFactory, new HttpOptions { HttpMessageHandler = mockHttpHandler.Object }); try { @@ -326,6 +326,8 @@ private async Task ServerClosesConnectionWithErrorIfHubCannotBeCreated(Transport { using (StartLog(out var loggerFactory, testName: $"ConnectionCanSendAndReceiveMessages_{transportType.ToString()}")) { + _serverFixture.SetTestLoggerFactory(loggerFactory); + var logger = loggerFactory.CreateLogger(); var url = _serverFixture.Url + "/uncreatable"; @@ -351,13 +353,24 @@ private async Task ServerClosesConnectionWithErrorIfHubCannotBeCreated(Transport }; logger.LogInformation("Starting connection to {url}", url); - await connection.StartAsync().OrTimeout(); + + try + { + await connection.StartAsync().OrTimeout(); + } + catch (OperationCanceledException) + { + // Due to a race, this can fail with OperationCanceledException in the SendAsync + // call that HubConnection does to send the negotiate message. + // This has only been happening on AppVeyor, likely due to a slower CI machine + // The closed event will still fire with the exception we care about. + } await closeTcs.Task.OrTimeout(); } catch (Exception ex) { - logger.LogInformation(ex, "Test threw exception"); + logger.LogError(ex, "Test threw {exceptionType}: {message}", ex.GetType(), ex.Message); throw; } finally diff --git a/test/xunit.runner.json b/test/xunit.runner.json index e1589333fe..874abebe0b 100644 --- a/test/xunit.runner.json +++ b/test/xunit.runner.json @@ -1,4 +1,4 @@ { "longRunningTestSeconds": 5, - "diagnosticMessages": false -} \ No newline at end of file + "diagnosticMessages": true +}