Skip to content

Commit 8bfb367

Browse files
Add buffer limit option for SignalR Stateful Reconnect (#49978)
1 parent ee13aad commit 8bfb367

File tree

17 files changed

+222
-50
lines changed

17 files changed

+222
-50
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public partial class HubConnection : IAsyncDisposable
5656
/// </summary>
5757
public static readonly TimeSpan DefaultKeepAliveInterval = TimeSpan.FromSeconds(15);
5858

59+
// Default amount of bytes we'll buffer when using Stateful Reconnect until applying backpressure to sends from the client.
60+
internal const long DefaultStatefulReconnectBufferSize = 100_000;
61+
5962
// The receive loop has a single reader and single writer at a time so optimize the channel for that
6063
private static readonly UnboundedChannelOptions _receiveLoopOptions = new UnboundedChannelOptions
6164
{
@@ -1899,7 +1902,9 @@ public ConnectionState(ConnectionContext connection, HubConnection hubConnection
18991902

19001903
if (Connection.Features.Get<IReconnectFeature>() is IReconnectFeature feature)
19011904
{
1902-
_messageBuffer = new MessageBuffer(connection, hubConnection._protocol);
1905+
_messageBuffer = new MessageBuffer(connection, hubConnection._protocol,
1906+
_hubConnection._serviceProvider.GetService<IOptions<HubConnectionOptions>>()?.Value.StatefulReconnectBufferSize
1907+
?? DefaultStatefulReconnectBufferSize);
19031908

19041909
feature.NotifyOnReconnect = _messageBuffer.Resend;
19051910
}

src/SignalR/clients/csharp/Client.Core/src/HubConnectionOptions.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,24 @@
1010
namespace Microsoft.AspNetCore.SignalR.Client;
1111

1212
/// <summary>
13-
/// Configures timeouts for the <see cref="HubConnection" />.
13+
/// Configures options for the <see cref="HubConnection" />.
1414
/// </summary>
15-
internal sealed class HubConnectionOptions
15+
public sealed class HubConnectionOptions
1616
{
1717
/// <summary>
1818
/// Configures ServerTimeout for the <see cref="HubConnection" />.
1919
/// </summary>
20-
public TimeSpan? ServerTimeout { get; set; }
20+
public TimeSpan ServerTimeout { get; set; } = HubConnection.DefaultServerTimeout;
2121

2222
/// <summary>
2323
/// Configures KeepAliveInterval for the <see cref="HubConnection" />.
2424
/// </summary>
25-
public TimeSpan? KeepAliveInterval { get; set; }
25+
public TimeSpan KeepAliveInterval { get; set; } = HubConnection.DefaultKeepAliveInterval;
26+
27+
/// <summary>
28+
/// Amount of serialized messages in bytes we'll buffer when using Stateful Reconnect until applying backpressure to sends from the client.
29+
/// </summary>
30+
/// <remarks>Defaults to 100,000 bytes.</remarks>
31+
public long StatefulReconnectBufferSize { get; set; } = HubConnection.DefaultStatefulReconnectBufferSize;
32+
2633
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
11
#nullable enable
2+
Microsoft.AspNetCore.SignalR.Client.HubConnectionOptions
3+
Microsoft.AspNetCore.SignalR.Client.HubConnectionOptions.HubConnectionOptions() -> void
4+
Microsoft.AspNetCore.SignalR.Client.HubConnectionOptions.KeepAliveInterval.get -> System.TimeSpan
5+
Microsoft.AspNetCore.SignalR.Client.HubConnectionOptions.KeepAliveInterval.set -> void
6+
Microsoft.AspNetCore.SignalR.Client.HubConnectionOptions.ServerTimeout.get -> System.TimeSpan
7+
Microsoft.AspNetCore.SignalR.Client.HubConnectionOptions.ServerTimeout.set -> void
8+
Microsoft.AspNetCore.SignalR.Client.HubConnectionOptions.StatefulReconnectBufferSize.get -> long
9+
Microsoft.AspNetCore.SignalR.Client.HubConnectionOptions.StatefulReconnectBufferSize.set -> void
210
static Microsoft.AspNetCore.SignalR.Client.HubConnectionBuilderExtensions.WithKeepAliveInterval(this Microsoft.AspNetCore.SignalR.Client.IHubConnectionBuilder! hubConnectionBuilder, System.TimeSpan interval) -> Microsoft.AspNetCore.SignalR.Client.IHubConnectionBuilder!
311
static Microsoft.AspNetCore.SignalR.Client.HubConnectionBuilderExtensions.WithServerTimeout(this Microsoft.AspNetCore.SignalR.Client.IHubConnectionBuilder! hubConnectionBuilder, System.TimeSpan timeout) -> Microsoft.AspNetCore.SignalR.Client.IHubConnectionBuilder!

src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2718,6 +2718,50 @@ public async Task ServerAbortsConnectionWithAckingEnabledNoReconnectAttempted()
27182718
}
27192719
}
27202720

2721+
[Fact]
2722+
public async Task CanSetMessageBufferSizeOnClient()
2723+
{
2724+
var protocol = HubProtocols["json"];
2725+
await using (var server = await StartServer<Startup>())
2726+
{
2727+
const string originalMessage = "SignalR";
2728+
var connectionBuilder = new HubConnectionBuilder()
2729+
.WithLoggerFactory(LoggerFactory)
2730+
.WithUrl(server.Url + "/default", HttpTransportType.WebSockets, o =>
2731+
{
2732+
o.UseAcks = true;
2733+
});
2734+
connectionBuilder.Services.Configure<HubConnectionOptions>(o => o.StatefulReconnectBufferSize = 500);
2735+
connectionBuilder.Services.AddSingleton(protocol);
2736+
var connection = connectionBuilder.Build();
2737+
2738+
try
2739+
{
2740+
await connection.StartAsync().DefaultTimeout();
2741+
var originalConnectionId = connection.ConnectionId;
2742+
2743+
var result = await connection.InvokeAsync<string>(nameof(TestHub.Echo), new string('x', 500)).DefaultTimeout();
2744+
2745+
var resultTask = connection.InvokeAsync<string>(nameof(TestHub.Echo), originalMessage).DefaultTimeout();
2746+
// Waiting for buffer to be unblocked by ack from server
2747+
Assert.False(resultTask.IsCompleted);
2748+
2749+
result = await resultTask;
2750+
2751+
Assert.Equal(originalMessage, result);
2752+
}
2753+
catch (Exception ex)
2754+
{
2755+
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
2756+
throw;
2757+
}
2758+
finally
2759+
{
2760+
await connection.DisposeAsync().DefaultTimeout();
2761+
}
2762+
}
2763+
}
2764+
27212765
private class OneAtATimeSynchronizationContext : SynchronizationContext, IAsyncDisposable
27222766
{
27232767
private readonly Channel<(SendOrPostCallback, object)> _taskQueue = Channel.CreateUnbounded<(SendOrPostCallback, object)>();

src/SignalR/common/Http.Connections.Common/src/NegotiationResponse.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System.Collections.Generic;
5+
using Microsoft.AspNetCore.Connections;
6+
using Microsoft.AspNetCore.Connections.Abstractions;
57

68
namespace Microsoft.AspNetCore.Http.Connections;
79

@@ -46,7 +48,9 @@ public class NegotiationResponse
4648
public string? Error { get; set; }
4749

4850
/// <summary>
49-
///
51+
/// If set, the connection should attempt to reconnect with the same <see cref="BaseConnectionContext.ConnectionId"/> if it disconnects.
52+
/// It should also set <see cref="IReconnectFeature"/> on the <see cref="BaseConnectionContext.Features"/> collection so other layers of the
53+
/// application (like SignalR) can react.
5054
/// </summary>
5155
public bool UseAcking { get; set; }
5256
}

src/SignalR/common/Shared/MessageBuffer.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ internal sealed class MessageBuffer : IDisposable
2121

2222
private readonly ConnectionContext _connection;
2323
private readonly IHubProtocol _protocol;
24+
private readonly long _bufferLimit;
2425
private readonly AckMessage _ackMessage = new(0);
2526
private readonly SequenceMessage _sequenceMessage = new(0);
26-
private readonly Channel<int> _waitForAck = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.DropOldest });
27-
private readonly int _bufferLimit = 100 * 1000;
27+
private readonly Channel<long> _waitForAck = Channel.CreateBounded<long>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.DropOldest });
2828

2929
#if NET8_0_OR_GREATER
3030
private readonly PeriodicTimer _timer = new(TimeSpan.FromSeconds(1));
@@ -46,21 +46,21 @@ internal sealed class MessageBuffer : IDisposable
4646
private object Lock => _buffer;
4747

4848
private LinkedBuffer _buffer;
49-
private int _bufferedByteCount;
49+
private long _bufferedByteCount;
5050

5151
static MessageBuffer()
5252
{
5353
_completedTCS.SetResult(new());
5454
}
5555

56-
// TODO: pass in limits
57-
public MessageBuffer(ConnectionContext connection, IHubProtocol protocol)
56+
public MessageBuffer(ConnectionContext connection, IHubProtocol protocol, long bufferLimit)
5857
{
5958
// TODO: pool
6059
_buffer = new LinkedBuffer();
6160

6261
_connection = connection;
6362
_protocol = protocol;
63+
_bufferLimit = bufferLimit;
6464

6565
#if !NET8_0_OR_GREATER
6666
_timer.Start();
@@ -179,7 +179,7 @@ public void Ack(AckMessage ackMessage)
179179
{
180180
// TODO: what if ackMessage.SequenceId is larger than last sent message?
181181

182-
var newCount = -1;
182+
var newCount = -1L;
183183

184184
lock (Lock)
185185
{
Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,24 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
using System;
5-
using System.Collections.Generic;
6-
using System.Linq;
7-
using System.Text;
8-
using System.Threading.Tasks;
9-
104
namespace Microsoft.AspNetCore.SignalR.Protocol;
115

126
/// <summary>
13-
/// Represents the ID being acknowledged so we can stop buffering older messages.
7+
/// Represents the ID being acknowledged so older messages do not need to be buffered anymore.
148
/// </summary>
159
public sealed class AckMessage : HubMessage
1610
{
1711
/// <summary>
18-
///
12+
/// Initializes a new instance of the <see cref="AckMessage"/> class.
1913
/// </summary>
20-
/// <param name="sequenceId"></param>
14+
/// <param name="sequenceId">The ID of the last message that was received.</param>
2115
public AckMessage(long sequenceId)
2216
{
2317
SequenceId = sequenceId;
2418
}
2519

2620
/// <summary>
27-
///
28-
/// </summary>
29-
public long SequenceId { get; set; }
30-
}
31-
32-
/// <summary>
33-
/// Represents the restart of the sequence of messages being sent. <see cref="SequenceId"/> is the starting ID of messages being sent, which might be duplicate messages.
34-
/// </summary>
35-
public sealed class SequenceMessage : HubMessage
36-
{
37-
/// <summary>
38-
///
39-
/// </summary>
40-
/// <param name="sequenceId"></param>
41-
public SequenceMessage(long sequenceId)
42-
{
43-
SequenceId = sequenceId;
44-
}
45-
46-
/// <summary>
47-
///
21+
/// The ID of the last message that was received.
4822
/// </summary>
4923
public long SequenceId { get; set; }
5024
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
namespace Microsoft.AspNetCore.SignalR.Protocol;
5+
6+
/// <summary>
7+
/// Represents the restart of the sequence of messages being sent. <see cref="SequenceId"/> is the starting ID of messages being sent, which might be duplicate messages.
8+
/// </summary>
9+
public sealed class SequenceMessage : HubMessage
10+
{
11+
/// <summary>
12+
/// Initializes a new instance of the <see cref="SequenceMessage"/> class.
13+
/// </summary>
14+
/// <param name="sequenceId">Specifies the starting ID for messages that will be received from this point onward.</param>
15+
public SequenceMessage(long sequenceId)
16+
{
17+
SequenceId = sequenceId;
18+
}
19+
20+
/// <summary>
21+
/// The new starting ID of incoming messages.
22+
/// </summary>
23+
public long SequenceId { get; set; }
24+
}

src/SignalR/common/testassets/Tests.Utils/TestClient.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,12 @@ public async Task<CompletionMessage> InvokeAsync(string methodName, params objec
167167
case PingMessage _:
168168
// Pings are ignored
169169
break;
170+
case AckMessage _:
171+
// Ignored for now
172+
break;
173+
case SequenceMessage _:
174+
// Ignored for now
175+
break;
170176
default:
171177
// Message implement ToString so this should be helpful.
172178
throw new NotSupportedException($"TestClient recieved an unexpected message: {message}.");

src/SignalR/server/Core/src/HubConnectionContext.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public partial class HubConnectionContext
4646
private volatile bool _allowReconnect = true;
4747
private readonly int _streamBufferCapacity;
4848
private readonly long? _maxMessageSize;
49+
private readonly long _statefulReconnectBufferSize;
4950
private bool _receivedMessageTimeoutEnabled;
5051
private TimeSpan _receivedMessageElapsed;
5152
private long _receivedMessageTick;
@@ -68,6 +69,7 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo
6869
_clientTimeoutInterval = contextOptions.ClientTimeoutInterval;
6970
_streamBufferCapacity = contextOptions.StreamBufferCapacity;
7071
_maxMessageSize = contextOptions.MaximumReceiveMessageSize;
72+
_statefulReconnectBufferSize = contextOptions.StatefulReconnectBufferSize;
7173

7274
_connectionContext = connectionContext;
7375
_logger = loggerFactory.CreateLogger<HubConnectionContext>();
@@ -577,7 +579,7 @@ await WriteHandshakeResponseAsync(new HandshakeResponseMessage(
577579
if (_connectionContext.Features.Get<IReconnectFeature>() is IReconnectFeature feature)
578580
{
579581
_useAcks = true;
580-
_messageBuffer = new MessageBuffer(_connectionContext, Protocol);
582+
_messageBuffer = new MessageBuffer(_connectionContext, Protocol, _statefulReconnectBufferSize);
581583
feature.NotifyOnReconnect = _messageBuffer.Resend;
582584
}
583585
return true;

src/SignalR/server/Core/src/HubConnectionContextOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,9 @@ public class HubConnectionContextOptions
3434
/// Gets or sets the maximum parallel hub method invocations.
3535
/// </summary>
3636
public int MaximumParallelInvocations { get; set; } = 1;
37+
38+
/// <summary>
39+
/// Gets or sets the maximum bytes to buffer per connection when using stateful reconnect.
40+
/// </summary>
41+
internal long StatefulReconnectBufferSize { get; set; } = 100_000;
3742
}

src/SignalR/server/Core/src/HubConnectionHandler.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class HubConnectionHandler<THub> : ConnectionHandler where THub : Hub
2828
private readonly bool _enableDetailedErrors;
2929
private readonly long? _maximumMessageSize;
3030
private readonly int _maxParallelInvokes;
31+
private readonly long _statefulReconnectBufferSize;
3132

3233
// Internal for testing
3334
internal TimeProvider TimeProvider { get; set; } = TimeProvider.System;
@@ -70,6 +71,7 @@ IServiceScopeFactory serviceScopeFactory
7071
_enableDetailedErrors = _hubOptions.EnableDetailedErrors ?? _enableDetailedErrors;
7172
_maxParallelInvokes = _hubOptions.MaximumParallelInvocationsPerClient;
7273
disableImplicitFromServiceParameters = _hubOptions.DisableImplicitFromServicesParameters;
74+
_statefulReconnectBufferSize = _hubOptions.StatefulReconnectBufferSize;
7375

7476
if (_hubOptions.HubFilters != null)
7577
{
@@ -82,6 +84,7 @@ IServiceScopeFactory serviceScopeFactory
8284
_enableDetailedErrors = _globalHubOptions.EnableDetailedErrors ?? _enableDetailedErrors;
8385
_maxParallelInvokes = _globalHubOptions.MaximumParallelInvocationsPerClient;
8486
disableImplicitFromServiceParameters = _globalHubOptions.DisableImplicitFromServicesParameters;
87+
_statefulReconnectBufferSize = _globalHubOptions.StatefulReconnectBufferSize;
8588

8689
if (_globalHubOptions.HubFilters != null)
8790
{
@@ -123,6 +126,7 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
123126
MaximumReceiveMessageSize = _maximumMessageSize,
124127
TimeProvider = TimeProvider,
125128
MaximumParallelInvocations = _maxParallelInvokes,
129+
StatefulReconnectBufferSize = _statefulReconnectBufferSize,
126130
};
127131

128132
Log.ConnectedStarting(_logger);

src/SignalR/server/Core/src/HubOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,10 @@ public int MaximumParallelInvocationsPerClient
7979
/// False by default. Hub method arguments will be resolved from a DI container if possible.
8080
/// </remarks>
8181
public bool DisableImplicitFromServicesParameters { get; set; }
82+
83+
/// <summary>
84+
/// Gets or sets the maximum bytes to buffer per connection when using stateful reconnect.
85+
/// </summary>
86+
/// <remarks>Defaults to 100,000 bytes.</remarks>
87+
public long StatefulReconnectBufferSize { get; set; } = 100_000;
8288
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
#nullable enable
2+
Microsoft.AspNetCore.SignalR.HubOptions.StatefulReconnectBufferSize.get -> long
3+
Microsoft.AspNetCore.SignalR.HubOptions.StatefulReconnectBufferSize.set -> void

src/SignalR/server/SignalR/test/AddSignalRTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public void HubSpecificOptionsHaveSameValuesAsGlobalHubOptions()
112112
Assert.Equal(globalHubOptions.ClientTimeoutInterval, hubOptions.ClientTimeoutInterval);
113113
Assert.Equal(globalHubOptions.MaximumParallelInvocationsPerClient, hubOptions.MaximumParallelInvocationsPerClient);
114114
Assert.Equal(globalHubOptions.DisableImplicitFromServicesParameters, hubOptions.DisableImplicitFromServicesParameters);
115+
Assert.Equal(globalHubOptions.StatefulReconnectBufferSize, hubOptions.StatefulReconnectBufferSize);
115116
Assert.True(hubOptions.UserHasSetValues);
116117
}
117118

@@ -147,6 +148,7 @@ public void UserSpecifiedOptionsRunAfterDefaultOptions()
147148
options.ClientTimeoutInterval = TimeSpan.FromSeconds(1);
148149
options.MaximumParallelInvocationsPerClient = 3;
149150
options.DisableImplicitFromServicesParameters = true;
151+
options.StatefulReconnectBufferSize = 23;
150152
});
151153

152154
var serviceProvider = serviceCollection.BuildServiceProvider();
@@ -161,6 +163,7 @@ public void UserSpecifiedOptionsRunAfterDefaultOptions()
161163
Assert.Equal(3, globalOptions.MaximumParallelInvocationsPerClient);
162164
Assert.Equal(TimeSpan.FromSeconds(1), globalOptions.ClientTimeoutInterval);
163165
Assert.True(globalOptions.DisableImplicitFromServicesParameters);
166+
Assert.Equal(23, globalOptions.StatefulReconnectBufferSize);
164167
}
165168

166169
[Fact]

0 commit comments

Comments
 (0)