Skip to content

Commit 020c78b

Browse files
authored
State management improvements (#9)
* version is empty by default * avoid base64 * properly handle temporary errors * handle errors during unsubscribe * bump to 0.0.7
1 parent 72a1bf2 commit 020c78b

File tree

9 files changed

+85
-30
lines changed

9 files changed

+85
-30
lines changed

src/Centrifugal.Centrifuge/Centrifugal.Centrifuge.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
<!-- Package Information -->
1717
<PackageId>Centrifugal.Centrifuge</PackageId>
18-
<Version>0.0.6</Version>
18+
<Version>0.0.7</Version>
1919
<Authors>Centrifugal Labs</Authors>
2020
<Company>Centrifugal Labs</Company>
2121
<Description>

src/Centrifugal.Centrifuge/Client.cs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public class CentrifugeClient : IDisposable, IAsyncDisposable
110110
public event EventHandler<CentrifugeDisconnectedEventArgs>? Disconnected;
111111

112112
/// <summary>
113-
/// Event raised when an error occurs.
113+
/// Event raised when an error occurs. Mostly for the logging purposes.
114114
/// </summary>
115115
public event EventHandler<CentrifugeErrorEventArgs>? Error;
116116

@@ -814,7 +814,16 @@ private async Task StartConnectingAsync(int code, string reason)
814814

815815
private async Task CreateTransportAsync()
816816
{
817-
_transport?.Dispose();
817+
// Unsubscribe from old transport events before disposing to prevent race conditions
818+
if (_transport != null)
819+
{
820+
_transport.Opened -= OnTransportOpened;
821+
_transport.MessageReceived -= OnTransportMessage;
822+
_transport.Closed -= OnTransportClosed;
823+
_transport.Error -= OnTransportError;
824+
_transport.Dispose();
825+
_transport = null;
826+
}
818827

819828
ITransport transport;
820829

@@ -1489,7 +1498,7 @@ private async Task HandleTransportClosedAsync(TransportClosedEventArgs e)
14891498
string reason = e.Reason;
14901499
_logger?.LogDebug($"HandleTransportClosedAsync - processing with code: {code}, reason: '{reason}'");
14911500

1492-
// Check for non-reconnectable disconnect codes (matching centrifuge-js behavior)
1501+
// Check for non-reconnectable disconnect codes
14931502
// Codes 3500-3999 and 4500-4999 mean permanent disconnect
14941503
// Also BadProtocol, Unauthorized, and MessageSizeLimit are permanent
14951504
if (e.Code.HasValue)
@@ -1537,7 +1546,6 @@ private async Task HandleTransportClosedAsync(TransportClosedEventArgs e)
15371546
SetState(CentrifugeClientState.Connecting);
15381547

15391548
// Move all subscribed subscriptions to subscribing state BEFORE emitting connecting event
1540-
// (matching centrifuge-js behavior - see _disconnect method)
15411549
foreach (var sub in _subscriptions.Values)
15421550
{
15431551
sub.MoveToSubscribing(CentrifugeSubscribingCodes.TransportClosed, "transport closed");
@@ -1564,7 +1572,7 @@ private void OnTransportError(object? sender, Exception e)
15641572

15651573
internal async Task HandleSubscribeTimeoutAsync()
15661574
{
1567-
// Subscribe timeout triggers client disconnect with reconnect, matching centrifuge-js behavior
1575+
// Subscribe timeout triggers client disconnect with reconnect
15681576
if (_state == CentrifugeClientState.Disconnected)
15691577
{
15701578
return;
@@ -1578,6 +1586,22 @@ internal async Task HandleSubscribeTimeoutAsync()
15781586
await ScheduleReconnectAsync().ConfigureAwait(false);
15791587
}
15801588

1589+
internal async Task HandleUnsubscribeErrorAsync()
1590+
{
1591+
// Unsubscribe error triggers client disconnect with reconnect (matching centrifuge-js behavior)
1592+
if (_state == CentrifugeClientState.Disconnected)
1593+
{
1594+
return;
1595+
}
1596+
1597+
await StartConnectingAsync(CentrifugeConnectingCodes.UnsubscribeError, "unsubscribe error").ConfigureAwait(false);
1598+
1599+
// Properly clean up transport before reconnecting
1600+
await CleanupTransportAsync().ConfigureAwait(false);
1601+
1602+
await ScheduleReconnectAsync().ConfigureAwait(false);
1603+
}
1604+
15811605
/// <summary>
15821606
/// Cleans up the current transport (unsubscribes events, closes, disposes).
15831607
/// This should be called before reconnecting to prevent duplicate connections.

src/Centrifugal.Centrifuge/ClientOptions.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ namespace Centrifugal.Centrifuge
1111
public class CentrifugeClientOptions
1212
{
1313
/// <summary>
14-
/// Gets or sets the initial connection token (JWT).
14+
/// Gets or sets the initial connection token (JWT). For proper JWT expiration handling
15+
/// GetToken must be also set.
1516
/// </summary>
1617
public string? Token { get; set; }
1718

@@ -35,9 +36,9 @@ public class CentrifugeClientOptions
3536

3637
/// <summary>
3738
/// Gets or sets the client version.
38-
/// Default is the assembly version.
39+
/// By default not set.
3940
/// </summary>
40-
public string Version { get; set; } = typeof(CentrifugeClient).Assembly.GetName().Version?.ToString() ?? "1.0.0";
41+
public string Version { get; set; } = "";
4142

4243
/// <summary>
4344
/// Gets or sets the minimum delay between reconnect attempts.

src/Centrifugal.Centrifuge/Subscription.cs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,24 @@ internal async Task SendSubscribeIfNeededAsync()
488488
{
489489
await SetUnsubscribedAsync(CentrifugeUnsubscribedCodes.Unauthorized, "unauthorized").ConfigureAwait(false);
490490
}
491+
catch (CentrifugeException ex)
492+
{
493+
OnError("subscribe", ex);
494+
// Temporary errors or token expired (109) - schedule resubscribe
495+
// Permanent errors - unsubscribe (matching centrifuge-js behavior)
496+
if (ex.Code < 100 || ex.Code == 109 || ex.Temporary)
497+
{
498+
if (ex.Code == 109)
499+
{
500+
_refreshRequired = true;
501+
}
502+
await ScheduleResubscribeAsync().ConfigureAwait(false);
503+
}
504+
else
505+
{
506+
await SetUnsubscribedAsync(ex.Code, ex.Message).ConfigureAwait(false);
507+
}
508+
}
491509
catch (Exception ex)
492510
{
493511
OnError("subscribe", ex);
@@ -528,6 +546,24 @@ private async Task StartSubscribingAsync(int code, string reason)
528546
{
529547
await SetUnsubscribedAsync(CentrifugeUnsubscribedCodes.Unauthorized, "unauthorized").ConfigureAwait(false);
530548
}
549+
catch (CentrifugeException ex)
550+
{
551+
OnError("subscribe", ex);
552+
// Temporary errors or token expired (109) - schedule resubscribe
553+
// Permanent errors - unsubscribe (matching centrifuge-js behavior)
554+
if (ex.Code < 100 || ex.Code == 109 || ex.Temporary)
555+
{
556+
if (ex.Code == 109)
557+
{
558+
_refreshRequired = true;
559+
}
560+
await ScheduleResubscribeAsync().ConfigureAwait(false);
561+
}
562+
else
563+
{
564+
await SetUnsubscribedAsync(ex.Code, ex.Message).ConfigureAwait(false);
565+
}
566+
}
531567
catch (Exception ex)
532568
{
533569
OnError("subscribe", ex);
@@ -831,7 +867,8 @@ internal async Task SetUnsubscribedAsync(int code, string reason)
831867
}
832868
catch
833869
{
834-
// Ignore errors during unsubscribe
870+
// Unsubscribe error triggers client disconnect with reconnect (matching centrifuge-js behavior)
871+
await _client.HandleUnsubscribeErrorAsync().ConfigureAwait(false);
835872
}
836873
}
837874
}

src/Centrifugal.Centrifuge/Transports/BrowserHttpStreamTransport.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,19 +233,17 @@ public void OnOpen()
233233
/// <summary>
234234
/// JavaScript callback when HTTP stream receives a chunk.
235235
/// </summary>
236-
/// <param name="base64Chunk">Chunk data as base64-encoded string.</param>
236+
/// <param name="chunk">Chunk data as byte array.</param>
237237
[JSInvokable]
238-
public void OnChunk(string base64Chunk)
238+
public void OnChunk(byte[] chunk)
239239
{
240-
if (string.IsNullOrEmpty(base64Chunk))
240+
if (chunk == null || chunk.Length == 0)
241241
{
242242
return;
243243
}
244244

245245
try
246246
{
247-
// Decode base64 to byte array
248-
byte[] chunk = Convert.FromBase64String(base64Chunk);
249247

250248
lock (_bufferLock)
251249
{

src/Centrifugal.Centrifuge/Transports/BrowserWebSocketTransport.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,19 +219,17 @@ public void OnOpen()
219219
/// <summary>
220220
/// JavaScript callback when WebSocket receives a message.
221221
/// </summary>
222-
/// <param name="base64Data">Message data as base64-encoded string.</param>
222+
/// <param name="data">Message data as byte array.</param>
223223
[JSInvokable]
224-
public void OnMessage(string base64Data)
224+
public void OnMessage(byte[] data)
225225
{
226-
if (string.IsNullOrEmpty(base64Data))
226+
if (data == null || data.Length == 0)
227227
{
228228
return;
229229
}
230230

231231
try
232232
{
233-
// Decode base64 to byte array
234-
byte[] data = Convert.FromBase64String(base64Data);
235233

236234
// Process varint-delimited messages
237235
using var ms = new MemoryStream(data);

src/Centrifugal.Centrifuge/Transports/ITransport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ internal interface ITransport : IDisposable
2121

2222
/// <summary>
2323
/// Gets whether this transport uses emulation mode.
24-
/// Emulation mode is used for unidirectional transports (SSE, HTTP Stream)
24+
/// Emulation mode is used for unidirectional transports (HTTP Stream at this point)
2525
/// where sends go through a separate emulation endpoint.
2626
/// </summary>
2727
bool UsesEmulation { get; }

src/Centrifugal.Centrifuge/wwwroot/centrifuge-httpstream.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,8 @@ window.CentrifugeHttpStream = {
107107
break;
108108
}
109109

110-
// Send chunk to .NET as base64 string (JSInterop limitation)
111-
const base64 = btoa(String.fromCharCode.apply(null, value));
112-
dotnetRef.invokeMethodAsync('OnChunk', base64);
110+
// Pass Uint8Array directly - Blazor marshals it to byte[]
111+
dotnetRef.invokeMethodAsync('OnChunk', value);
113112
}
114113
} catch (error) {
115114
self.debugLog(debug, '[CentrifugeHttpStream] Read loop error for stream', id, ':', error.name, error.message);

src/Centrifugal.Centrifuge/wwwroot/centrifuge-websocket.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,14 @@ window.CentrifugeWebSocket = {
4242

4343
socket.onmessage = (event) => {
4444
if (event.data instanceof ArrayBuffer) {
45-
// Convert ArrayBuffer to base64 for .NET interop
45+
// Pass Uint8Array directly - Blazor marshals it to byte[]
4646
const bytes = new Uint8Array(event.data);
47-
const base64 = btoa(String.fromCharCode.apply(null, bytes));
48-
dotnetRef.invokeMethodAsync('OnMessage', base64);
47+
dotnetRef.invokeMethodAsync('OnMessage', bytes);
4948
} else if (typeof event.data === 'string') {
50-
// Text message - convert to UTF-8 bytes then base64
49+
// Text message - convert to UTF-8 bytes
5150
const encoder = new TextEncoder();
5251
const bytes = encoder.encode(event.data);
53-
const base64 = btoa(String.fromCharCode.apply(null, bytes));
54-
dotnetRef.invokeMethodAsync('OnMessage', base64);
52+
dotnetRef.invokeMethodAsync('OnMessage', bytes);
5553
}
5654
};
5755

0 commit comments

Comments
 (0)