diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java index 12349152dc1e..b0bcbb364829 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java @@ -66,6 +66,7 @@ public class HubConnection implements AutoCloseable { private final int negotiateVersion = 1; private final Logger logger = LoggerFactory.getLogger(HubConnection.class); private ScheduledExecutorService handshakeTimeout = null; + private Completable start; /** * Sets the server timeout interval for the connection. @@ -341,83 +342,99 @@ public void setBaseUrl(String url) { * @return A Completable that completes when the connection has been established. */ public Completable start() { - if (hubConnectionState != HubConnectionState.DISCONNECTED) { - return Completable.complete(); - } - - handshakeResponseSubject = CompletableSubject.create(); - handshakeReceived = false; - CompletableSubject tokenCompletable = CompletableSubject.create(); - localHeaders.put(UserAgentHelper.getUserAgentName(), UserAgentHelper.createUserAgentString()); - if (headers != null) { - this.localHeaders.putAll(headers); - } + CompletableSubject localStart = CompletableSubject.create(); - accessTokenProvider.subscribe(token -> { - if (token != null && !token.isEmpty()) { - this.localHeaders.put("Authorization", "Bearer " + token); + hubConnectionStateLock.lock(); + try { + if (hubConnectionState != HubConnectionState.DISCONNECTED) { + logger.debug("The connection is in the '{}' state. Waiting for in-progress start to complete or completing this start immediately.", hubConnectionState); + return start; } - tokenCompletable.onComplete(); - }, error -> { - tokenCompletable.onError(error); - }); - stopError = null; - Single negotiate = null; - if (!skipNegotiate) { - negotiate = tokenCompletable.andThen(Single.defer(() -> startNegotiate(baseUrl, 0))); - } else { - negotiate = tokenCompletable.andThen(Single.defer(() -> Single.just(new NegotiateResponse(baseUrl)))); - } + hubConnectionState = HubConnectionState.CONNECTING; + start = localStart; - CompletableSubject start = CompletableSubject.create(); + handshakeResponseSubject = CompletableSubject.create(); + handshakeReceived = false; + CompletableSubject tokenCompletable = CompletableSubject.create(); + localHeaders.put(UserAgentHelper.getUserAgentName(), UserAgentHelper.createUserAgentString()); + if (headers != null) { + this.localHeaders.putAll(headers); + } - negotiate.flatMapCompletable(negotiateResponse -> { - logger.debug("Starting HubConnection."); - if (transport == null) { - Single tokenProvider = negotiateResponse.getAccessToken() != null ? Single.just(negotiateResponse.getAccessToken()) : accessTokenProvider; - switch (transportEnum) { - case LONG_POLLING: - transport = new LongPollingTransport(localHeaders, httpClient, tokenProvider); - break; - default: - transport = new WebSocketTransport(localHeaders, httpClient); + accessTokenProvider.subscribe(token -> { + if (token != null && !token.isEmpty()) { + this.localHeaders.put("Authorization", "Bearer " + token); } + tokenCompletable.onComplete(); + }, error -> { + tokenCompletable.onError(error); + }); + + stopError = null; + Single negotiate = null; + if (!skipNegotiate) { + negotiate = tokenCompletable.andThen(Single.defer(() -> startNegotiate(baseUrl, 0))); + } else { + negotiate = tokenCompletable.andThen(Single.defer(() -> Single.just(new NegotiateResponse(baseUrl)))); } - transport.setOnReceive(this.callback); - transport.setOnClose((message) -> stopConnection(message)); - - return transport.start(negotiateResponse.getFinalUrl()).andThen(Completable.defer(() -> { - ByteBuffer handshake = HandshakeProtocol.createHandshakeRequestMessage( - new HandshakeRequestMessage(protocol.getName(), protocol.getVersion())); - - connectionState = new ConnectionState(this); - - return transport.send(handshake).andThen(Completable.defer(() -> { - timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS); - return handshakeResponseSubject.andThen(Completable.defer(() -> { - hubConnectionStateLock.lock(); - try { - hubConnectionState = HubConnectionState.CONNECTED; - logger.info("HubConnection started."); - resetServerTimeout(); - //Don't send pings if we're using long polling. - if (transportEnum != TransportEnum.LONG_POLLING) { - activatePingTimer(); + negotiate.flatMapCompletable(negotiateResponse -> { + logger.debug("Starting HubConnection."); + if (transport == null) { + Single tokenProvider = negotiateResponse.getAccessToken() != null ? Single.just(negotiateResponse.getAccessToken()) : accessTokenProvider; + switch (transportEnum) { + case LONG_POLLING: + transport = new LongPollingTransport(localHeaders, httpClient, tokenProvider); + break; + default: + transport = new WebSocketTransport(localHeaders, httpClient); + } + } + + transport.setOnReceive(this.callback); + transport.setOnClose((message) -> stopConnection(message)); + + return transport.start(negotiateResponse.getFinalUrl()).andThen(Completable.defer(() -> { + ByteBuffer handshake = HandshakeProtocol.createHandshakeRequestMessage( + new HandshakeRequestMessage(protocol.getName(), protocol.getVersion())); + + connectionState = new ConnectionState(this); + + return transport.send(handshake).andThen(Completable.defer(() -> { + timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS); + return handshakeResponseSubject.andThen(Completable.defer(() -> { + hubConnectionStateLock.lock(); + try { + hubConnectionState = HubConnectionState.CONNECTED; + logger.info("HubConnection started."); + resetServerTimeout(); + //Don't send pings if we're using long polling. + if (transportEnum != TransportEnum.LONG_POLLING) { + activatePingTimer(); + } + } finally { + hubConnectionStateLock.unlock(); } - } finally { - hubConnectionStateLock.unlock(); - } - return Completable.complete(); + return Completable.complete(); + })); })); })); - })); - // subscribe makes this a "hot" completable so this runs immediately - }).subscribeWith(start); + // subscribe makes this a "hot" completable so this runs immediately + }).subscribe(() -> { + localStart.onComplete(); + }, error -> { + hubConnectionStateLock.lock(); + hubConnectionState = HubConnectionState.DISCONNECTED; + hubConnectionStateLock.unlock(); + localStart.onError(error); + }); + } finally { + hubConnectionStateLock.unlock(); + } - return start; + return localStart; } private void activatePingTimer() { @@ -445,8 +462,8 @@ public void run() { } private Single startNegotiate(String url, int negotiateAttempts) { - if (hubConnectionState != HubConnectionState.DISCONNECTED) { - return Single.just(null); + if (hubConnectionState != HubConnectionState.CONNECTING) { + throw new RuntimeException("HubConnection trying to negotiate when not in the CONNECTING state."); } return handleNegotiate(url).flatMap(response -> { diff --git a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnectionState.java b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnectionState.java index a357946b6aec..d3fc796ee410 100644 --- a/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnectionState.java +++ b/src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnectionState.java @@ -9,4 +9,5 @@ public enum HubConnectionState { CONNECTED, DISCONNECTED, + CONNECTING, } \ No newline at end of file diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java index efee55e3ef3a..89b5a0efdd9e 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java @@ -24,6 +24,7 @@ import io.reactivex.Observable; import io.reactivex.Single; import io.reactivex.disposables.Disposable; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.CompletableSubject; import io.reactivex.subjects.PublishSubject; import io.reactivex.subjects.ReplaySubject; @@ -2577,9 +2578,10 @@ public void receiveHandshakeResponseAndMessage() { value.getAndUpdate((val) -> val + 1); }); + SingleSubject handshakeMessageTask = mockTransport.getNextSentMessage(); // On start we're going to receive the handshake response and also an invocation in the same payload. hubConnection.start(); - mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait(); + ByteBuffer sentMessage = handshakeMessageTask.timeout(1, TimeUnit.SECONDS).blockingGet(); String expectedSentMessage = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR; assertEquals(expectedSentMessage, TestUtils.byteBufferToString(mockTransport.getSentMessages()[0])); @@ -2647,7 +2649,7 @@ public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithError } @Test - public void callingStartOnStartedHubConnectionNoOps() { + public void callingStartOnStartedHubConnectionNoops() { HubConnection hubConnection = TestUtils.createHubConnection("http://example.com"); hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); @@ -2655,7 +2657,35 @@ public void callingStartOnStartedHubConnectionNoOps() { hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); - hubConnection.stop(); + hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); + } + + @Test + public void callingStartOnStartingHubConnectionWaitsForOriginalStart() { + CompletableSubject startedAccessToken = CompletableSubject.create(); + CompletableSubject continueAccessToken = CompletableSubject.create(); + HubConnection hubConnection = HubConnectionBuilder.create("http://example.com") + .withTransportImplementation(new MockTransport(true)) + .withHttpClient(new TestHttpClient()) + .withAccessTokenProvider(Single.defer(() -> { + startedAccessToken.onComplete(); + continueAccessToken.timeout(1, TimeUnit.SECONDS).blockingAwait(); + return Single.just("test"); + }).subscribeOn(Schedulers.newThread())) + .shouldSkipNegotiate(true) + .build(); + Completable start = hubConnection.start(); + startedAccessToken.timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertEquals(HubConnectionState.CONNECTING, hubConnection.getConnectionState()); + + Completable start2 = hubConnection.start(); + continueAccessToken.onComplete(); + start.timeout(1, TimeUnit.SECONDS).blockingAwait(); + start2.timeout(1, TimeUnit.SECONDS).blockingAwait(); + assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); + + hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait(); assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState()); } @@ -3595,9 +3625,6 @@ public void userSetAuthHeaderIsNotClearedAfterRedirect() { assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop().blockingAwait(); assertEquals("ExampleValue", beforeRedirectHeader.get()); - - hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); - assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); assertEquals("Bearer redirectToken", afterRedirectHeader.get()); // Making sure you can do this after restarting the HubConnection. @@ -3605,9 +3632,6 @@ public void userSetAuthHeaderIsNotClearedAfterRedirect() { assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); hubConnection.stop().blockingAwait(); assertEquals("ExampleValue", beforeRedirectHeader.get()); - - hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait(); - assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState()); assertEquals("Bearer redirectToken", afterRedirectHeader.get()); } @@ -3699,7 +3723,7 @@ public void non200FromNegotiateThrowsError() { } @Test - public void hubConnectionCloseCallsStop() throws Exception { + public void hubConnectionCloseCallsStop() { MockTransport mockTransport = new MockTransport(); TestHttpClient client = new TestHttpClient() .on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{\"url\":\"http://testexample.com/\"}")))) diff --git a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java index d48c89818552..7a7ab4b8de8c 100644 --- a/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java +++ b/src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java @@ -8,7 +8,6 @@ import com.microsoft.signalr.HubConnection; import com.microsoft.signalr.HubConnectionBuilder; - public class Chat { public static void main(final String[] args) throws Exception { System.out.println("Enter the URL of the SignalR Chat you want to join"); @@ -33,7 +32,7 @@ public static void main(final String[] args) throws Exception { while (!message.equals("leave")) { // Scans the next token of the input as an int. message = reader.nextLine(); - hubConnection.send("Send", message); + hubConnection.send("Send", "Java", message); } hubConnection.stop().blockingAwait();