Skip to content

Commit db77380

Browse files
Java client connection state part 1 (#24166)
1 parent f2c3c11 commit db77380

File tree

4 files changed

+119
-78
lines changed

4 files changed

+119
-78
lines changed

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 83 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class HubConnection implements AutoCloseable {
6666
private final int negotiateVersion = 1;
6767
private final Logger logger = LoggerFactory.getLogger(HubConnection.class);
6868
private ScheduledExecutorService handshakeTimeout = null;
69+
private Completable start;
6970

7071
/**
7172
* Sets the server timeout interval for the connection.
@@ -341,83 +342,99 @@ public void setBaseUrl(String url) {
341342
* @return A Completable that completes when the connection has been established.
342343
*/
343344
public Completable start() {
344-
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
345-
return Completable.complete();
346-
}
347-
348-
handshakeResponseSubject = CompletableSubject.create();
349-
handshakeReceived = false;
350-
CompletableSubject tokenCompletable = CompletableSubject.create();
351-
localHeaders.put(UserAgentHelper.getUserAgentName(), UserAgentHelper.createUserAgentString());
352-
if (headers != null) {
353-
this.localHeaders.putAll(headers);
354-
}
345+
CompletableSubject localStart = CompletableSubject.create();
355346

356-
accessTokenProvider.subscribe(token -> {
357-
if (token != null && !token.isEmpty()) {
358-
this.localHeaders.put("Authorization", "Bearer " + token);
347+
hubConnectionStateLock.lock();
348+
try {
349+
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
350+
logger.debug("The connection is in the '{}' state. Waiting for in-progress start to complete or completing this start immediately.", hubConnectionState);
351+
return start;
359352
}
360-
tokenCompletable.onComplete();
361-
}, error -> {
362-
tokenCompletable.onError(error);
363-
});
364353

365-
stopError = null;
366-
Single<NegotiateResponse> negotiate = null;
367-
if (!skipNegotiate) {
368-
negotiate = tokenCompletable.andThen(Single.defer(() -> startNegotiate(baseUrl, 0)));
369-
} else {
370-
negotiate = tokenCompletable.andThen(Single.defer(() -> Single.just(new NegotiateResponse(baseUrl))));
371-
}
354+
hubConnectionState = HubConnectionState.CONNECTING;
355+
start = localStart;
372356

373-
CompletableSubject start = CompletableSubject.create();
357+
handshakeResponseSubject = CompletableSubject.create();
358+
handshakeReceived = false;
359+
CompletableSubject tokenCompletable = CompletableSubject.create();
360+
localHeaders.put(UserAgentHelper.getUserAgentName(), UserAgentHelper.createUserAgentString());
361+
if (headers != null) {
362+
this.localHeaders.putAll(headers);
363+
}
374364

375-
negotiate.flatMapCompletable(negotiateResponse -> {
376-
logger.debug("Starting HubConnection.");
377-
if (transport == null) {
378-
Single<String> tokenProvider = negotiateResponse.getAccessToken() != null ? Single.just(negotiateResponse.getAccessToken()) : accessTokenProvider;
379-
switch (transportEnum) {
380-
case LONG_POLLING:
381-
transport = new LongPollingTransport(localHeaders, httpClient, tokenProvider);
382-
break;
383-
default:
384-
transport = new WebSocketTransport(localHeaders, httpClient);
365+
accessTokenProvider.subscribe(token -> {
366+
if (token != null && !token.isEmpty()) {
367+
this.localHeaders.put("Authorization", "Bearer " + token);
385368
}
369+
tokenCompletable.onComplete();
370+
}, error -> {
371+
tokenCompletable.onError(error);
372+
});
373+
374+
stopError = null;
375+
Single<NegotiateResponse> negotiate = null;
376+
if (!skipNegotiate) {
377+
negotiate = tokenCompletable.andThen(Single.defer(() -> startNegotiate(baseUrl, 0)));
378+
} else {
379+
negotiate = tokenCompletable.andThen(Single.defer(() -> Single.just(new NegotiateResponse(baseUrl))));
386380
}
387381

388-
transport.setOnReceive(this.callback);
389-
transport.setOnClose((message) -> stopConnection(message));
390-
391-
return transport.start(negotiateResponse.getFinalUrl()).andThen(Completable.defer(() -> {
392-
ByteBuffer handshake = HandshakeProtocol.createHandshakeRequestMessage(
393-
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
394-
395-
connectionState = new ConnectionState(this);
396-
397-
return transport.send(handshake).andThen(Completable.defer(() -> {
398-
timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS);
399-
return handshakeResponseSubject.andThen(Completable.defer(() -> {
400-
hubConnectionStateLock.lock();
401-
try {
402-
hubConnectionState = HubConnectionState.CONNECTED;
403-
logger.info("HubConnection started.");
404-
resetServerTimeout();
405-
//Don't send pings if we're using long polling.
406-
if (transportEnum != TransportEnum.LONG_POLLING) {
407-
activatePingTimer();
382+
negotiate.flatMapCompletable(negotiateResponse -> {
383+
logger.debug("Starting HubConnection.");
384+
if (transport == null) {
385+
Single<String> tokenProvider = negotiateResponse.getAccessToken() != null ? Single.just(negotiateResponse.getAccessToken()) : accessTokenProvider;
386+
switch (transportEnum) {
387+
case LONG_POLLING:
388+
transport = new LongPollingTransport(localHeaders, httpClient, tokenProvider);
389+
break;
390+
default:
391+
transport = new WebSocketTransport(localHeaders, httpClient);
392+
}
393+
}
394+
395+
transport.setOnReceive(this.callback);
396+
transport.setOnClose((message) -> stopConnection(message));
397+
398+
return transport.start(negotiateResponse.getFinalUrl()).andThen(Completable.defer(() -> {
399+
ByteBuffer handshake = HandshakeProtocol.createHandshakeRequestMessage(
400+
new HandshakeRequestMessage(protocol.getName(), protocol.getVersion()));
401+
402+
connectionState = new ConnectionState(this);
403+
404+
return transport.send(handshake).andThen(Completable.defer(() -> {
405+
timeoutHandshakeResponse(handshakeResponseTimeout, TimeUnit.MILLISECONDS);
406+
return handshakeResponseSubject.andThen(Completable.defer(() -> {
407+
hubConnectionStateLock.lock();
408+
try {
409+
hubConnectionState = HubConnectionState.CONNECTED;
410+
logger.info("HubConnection started.");
411+
resetServerTimeout();
412+
//Don't send pings if we're using long polling.
413+
if (transportEnum != TransportEnum.LONG_POLLING) {
414+
activatePingTimer();
415+
}
416+
} finally {
417+
hubConnectionStateLock.unlock();
408418
}
409-
} finally {
410-
hubConnectionStateLock.unlock();
411-
}
412419

413-
return Completable.complete();
420+
return Completable.complete();
421+
}));
414422
}));
415423
}));
416-
}));
417-
// subscribe makes this a "hot" completable so this runs immediately
418-
}).subscribeWith(start);
424+
// subscribe makes this a "hot" completable so this runs immediately
425+
}).subscribe(() -> {
426+
localStart.onComplete();
427+
}, error -> {
428+
hubConnectionStateLock.lock();
429+
hubConnectionState = HubConnectionState.DISCONNECTED;
430+
hubConnectionStateLock.unlock();
431+
localStart.onError(error);
432+
});
433+
} finally {
434+
hubConnectionStateLock.unlock();
435+
}
419436

420-
return start;
437+
return localStart;
421438
}
422439

423440
private void activatePingTimer() {
@@ -445,8 +462,8 @@ public void run() {
445462
}
446463

447464
private Single<NegotiateResponse> startNegotiate(String url, int negotiateAttempts) {
448-
if (hubConnectionState != HubConnectionState.DISCONNECTED) {
449-
return Single.just(null);
465+
if (hubConnectionState != HubConnectionState.CONNECTING) {
466+
throw new RuntimeException("HubConnection trying to negotiate when not in the CONNECTING state.");
450467
}
451468

452469
return handleNegotiate(url).flatMap(response -> {

src/SignalR/clients/java/signalr/src/main/java/com/microsoft/signalr/HubConnectionState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@
99
public enum HubConnectionState {
1010
CONNECTED,
1111
DISCONNECTED,
12+
CONNECTING,
1213
}

src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.Observable;
2525
import io.reactivex.Single;
2626
import io.reactivex.disposables.Disposable;
27+
import io.reactivex.schedulers.Schedulers;
2728
import io.reactivex.subjects.CompletableSubject;
2829
import io.reactivex.subjects.PublishSubject;
2930
import io.reactivex.subjects.ReplaySubject;
@@ -2577,9 +2578,10 @@ public void receiveHandshakeResponseAndMessage() {
25772578
value.getAndUpdate((val) -> val + 1);
25782579
});
25792580

2581+
SingleSubject<ByteBuffer> handshakeMessageTask = mockTransport.getNextSentMessage();
25802582
// On start we're going to receive the handshake response and also an invocation in the same payload.
25812583
hubConnection.start();
2582-
mockTransport.getStartTask().timeout(1, TimeUnit.SECONDS).blockingAwait();
2584+
ByteBuffer sentMessage = handshakeMessageTask.timeout(1, TimeUnit.SECONDS).blockingGet();
25832585
String expectedSentMessage = "{\"protocol\":\"json\",\"version\":1}" + RECORD_SEPARATOR;
25842586
assertEquals(expectedSentMessage, TestUtils.byteBufferToString(mockTransport.getSentMessages()[0]));
25852587

@@ -2647,15 +2649,43 @@ public void hubConnectionClosesAndRunsOnClosedCallbackAfterCloseMessageWithError
26472649
}
26482650

26492651
@Test
2650-
public void callingStartOnStartedHubConnectionNoOps() {
2652+
public void callingStartOnStartedHubConnectionNoops() {
26512653
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com");
26522654
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
26532655
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
26542656

26552657
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
26562658
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
26572659

2658-
hubConnection.stop();
2660+
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
2661+
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
2662+
}
2663+
2664+
@Test
2665+
public void callingStartOnStartingHubConnectionWaitsForOriginalStart() {
2666+
CompletableSubject startedAccessToken = CompletableSubject.create();
2667+
CompletableSubject continueAccessToken = CompletableSubject.create();
2668+
HubConnection hubConnection = HubConnectionBuilder.create("http://example.com")
2669+
.withTransportImplementation(new MockTransport(true))
2670+
.withHttpClient(new TestHttpClient())
2671+
.withAccessTokenProvider(Single.defer(() -> {
2672+
startedAccessToken.onComplete();
2673+
continueAccessToken.timeout(1, TimeUnit.SECONDS).blockingAwait();
2674+
return Single.just("test");
2675+
}).subscribeOn(Schedulers.newThread()))
2676+
.shouldSkipNegotiate(true)
2677+
.build();
2678+
Completable start = hubConnection.start();
2679+
startedAccessToken.timeout(1, TimeUnit.SECONDS).blockingAwait();
2680+
assertEquals(HubConnectionState.CONNECTING, hubConnection.getConnectionState());
2681+
2682+
Completable start2 = hubConnection.start();
2683+
continueAccessToken.onComplete();
2684+
start.timeout(1, TimeUnit.SECONDS).blockingAwait();
2685+
start2.timeout(1, TimeUnit.SECONDS).blockingAwait();
2686+
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
2687+
2688+
hubConnection.stop().timeout(1, TimeUnit.SECONDS).blockingAwait();
26592689
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
26602690
}
26612691

@@ -3595,19 +3625,13 @@ public void userSetAuthHeaderIsNotClearedAfterRedirect() {
35953625
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
35963626
hubConnection.stop().blockingAwait();
35973627
assertEquals("ExampleValue", beforeRedirectHeader.get());
3598-
3599-
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
3600-
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
36013628
assertEquals("Bearer redirectToken", afterRedirectHeader.get());
36023629

36033630
// Making sure you can do this after restarting the HubConnection.
36043631
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
36053632
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
36063633
hubConnection.stop().blockingAwait();
36073634
assertEquals("ExampleValue", beforeRedirectHeader.get());
3608-
3609-
hubConnection.start().timeout(1, TimeUnit.SECONDS).blockingAwait();
3610-
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());
36113635
assertEquals("Bearer redirectToken", afterRedirectHeader.get());
36123636
}
36133637

@@ -3699,7 +3723,7 @@ public void non200FromNegotiateThrowsError() {
36993723
}
37003724

37013725
@Test
3702-
public void hubConnectionCloseCallsStop() throws Exception {
3726+
public void hubConnectionCloseCallsStop() {
37033727
MockTransport mockTransport = new MockTransport();
37043728
TestHttpClient client = new TestHttpClient()
37053729
.on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) -> Single.just(new HttpResponse(200, "", TestUtils.stringToByteBuffer("{\"url\":\"http://testexample.com/\"}"))))

src/SignalR/clients/java/signalr/src/test/java/com/microsoft/signalr/sample/Chat.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.microsoft.signalr.HubConnection;
99
import com.microsoft.signalr.HubConnectionBuilder;
1010

11-
1211
public class Chat {
1312
public static void main(final String[] args) throws Exception {
1413
System.out.println("Enter the URL of the SignalR Chat you want to join");
@@ -33,7 +32,7 @@ public static void main(final String[] args) throws Exception {
3332
while (!message.equals("leave")) {
3433
// Scans the next token of the input as an int.
3534
message = reader.nextLine();
36-
hubConnection.send("Send", message);
35+
hubConnection.send("Send", "Java", message);
3736
}
3837

3938
hubConnection.stop().blockingAwait();

0 commit comments

Comments
 (0)