Skip to content

Commit 3535cfb

Browse files
authored
Added the ability to configure the buffer sizes on the client (#35107)
* Added the ability to configure the buffer sizes on the client - When data comes in from the transport or sent to the transport, we use the default pipe buffer size on the client (which is 65K). This change makes it possible to configure both the transport buffer and the application buffer so that it can be configured based on users scenarios. - Raise the default limit from 65K to 1MB on the client side and raise it from 32K (old pipe limit) to 65K on the server side. - Added tests
1 parent 59cca98 commit 3535cfb

14 files changed

+179
-41
lines changed

src/SignalR/clients/csharp/Client/test/UnitTests/HttpConnectionFactoryTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public void ShallowCopyHttpConnectionOptionsCopiesAllPublicProperties()
9898
{ $"{nameof(HttpConnectionOptions.DefaultTransferFormat)}", TransferFormat.Text },
9999
{ $"{nameof(HttpConnectionOptions.WebSocketConfiguration)}", webSocketConfig },
100100
{ $"{nameof(HttpConnectionOptions.WebSocketFactory)}", webSocketFactory },
101+
{ $"{nameof(HttpConnectionOptions.ApplicationMaxBufferSize)}", 1L * 1024 * 1024 },
102+
{ $"{nameof(HttpConnectionOptions.TransportMaxBufferSize)}", 1L * 1024 * 1024 },
101103
};
102104

103105
var options = new HttpConnectionOptions();

src/SignalR/clients/csharp/Client/test/UnitTests/HttpConnectionTests.ConnectionLifecycle.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ bool ExpectedErrors(WriteContext writeContext)
337337
return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.InternalServerError));
338338
});
339339

340-
var sse = new ServerSentEventsTransport(new HttpClient(httpHandler), LoggerFactory);
340+
var sse = new ServerSentEventsTransport(new HttpClient(httpHandler), loggerFactory: LoggerFactory);
341341

342342
await WithConnectionAsync(
343343
CreateConnection(httpHandler, loggerFactory: LoggerFactory, transport: sse),
@@ -363,7 +363,7 @@ public async Task SSEWaitsForResponseToStart()
363363
return ResponseUtils.CreateResponse(HttpStatusCode.Accepted);
364364
});
365365

366-
var sse = new ServerSentEventsTransport(new HttpClient(httpHandler), LoggerFactory);
366+
var sse = new ServerSentEventsTransport(new HttpClient(httpHandler), loggerFactory: LoggerFactory);
367367

368368
await WithConnectionAsync(
369369
CreateConnection(httpHandler, loggerFactory: LoggerFactory, transport: sse),
@@ -434,7 +434,8 @@ public async Task CancellationTokenFromStartPassedToTransport()
434434

435435
await WithConnectionAsync(
436436
CreateConnection(httpHandler,
437-
transport: new TestTransport(onTransportStart: () => {
437+
transport: new TestTransport(onTransportStart: () =>
438+
{
438439
// Cancel the token when the transport is starting which will fail the startTask.
439440
cts.Cancel();
440441
return Task.CompletedTask;
@@ -462,7 +463,8 @@ public async Task CanceledCancellationTokenPassedToStartThrows()
462463

463464
await WithConnectionAsync(
464465
CreateConnection(httpHandler,
465-
transport: new TestTransport(onTransportStart: () => {
466+
transport: new TestTransport(onTransportStart: () =>
467+
{
466468
transportStartCalled = true;
467469
return Task.CompletedTask;
468470
})),
@@ -493,7 +495,7 @@ bool ExpectedErrors(WriteContext writeContext)
493495
throw new OperationCanceledException("Cancel SSE Start.");
494496
});
495497

496-
var sse = new ServerSentEventsTransport(new HttpClient(httpHandler), LoggerFactory);
498+
var sse = new ServerSentEventsTransport(new HttpClient(httpHandler), loggerFactory: LoggerFactory);
497499

498500
await WithConnectionAsync(
499501
CreateConnection(httpHandler, loggerFactory: LoggerFactory, transport: sse, transportType: HttpTransportType.ServerSentEvents),

src/SignalR/clients/csharp/Client/test/UnitTests/HttpConnectionTests.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Net.Http;
88
using System.Security.Cryptography.X509Certificates;
99
using System.Threading.Tasks;
10+
using Microsoft.AspNetCore.Connections;
1011
using Microsoft.AspNetCore.Http.Connections;
1112
using Microsoft.AspNetCore.Http.Connections.Client;
1213
using Microsoft.AspNetCore.SignalR.Tests;
@@ -21,6 +22,25 @@ namespace Microsoft.AspNetCore.SignalR.Client.Tests
2122
{
2223
public partial class HttpConnectionTests : VerifiableLoggedTest
2324
{
25+
[Fact]
26+
public void HttpConnectionOptionsDefaults()
27+
{
28+
var httpOptions = new HttpConnectionOptions();
29+
Assert.Equal(1024 * 1024, httpOptions.TransportMaxBufferSize);
30+
Assert.Equal(1024 * 1024, httpOptions.ApplicationMaxBufferSize);
31+
Assert.Equal(TimeSpan.FromSeconds(5), httpOptions.CloseTimeout);
32+
Assert.Equal(TransferFormat.Binary, httpOptions.DefaultTransferFormat);
33+
Assert.Equal(HttpTransports.All, httpOptions.Transports);
34+
}
35+
36+
[Fact]
37+
public void HttpConnectionOptionsNegativeBufferSizeThrows()
38+
{
39+
var httpOptions = new HttpConnectionOptions();
40+
Assert.Throws<ArgumentOutOfRangeException>(() => httpOptions.TransportMaxBufferSize = -1);
41+
Assert.Throws<ArgumentOutOfRangeException>(() => httpOptions.ApplicationMaxBufferSize = -1);
42+
}
43+
2444
[Fact]
2545
public void CannotCreateConnectionWithNullUrl()
2646
{

src/SignalR/clients/csharp/Client/test/UnitTests/LongPollingTransportTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public async Task StopTransportWhenConnectionAlreadyStoppedOnServer()
254254
{
255255
using (var httpClient = new HttpClient(mockHttpHandler.Object))
256256
{
257-
var longPollingTransport = new LongPollingTransport(httpClient, LoggerFactory);
257+
var longPollingTransport = new LongPollingTransport(httpClient, loggerFactory: LoggerFactory);
258258

259259
await longPollingTransport.StartAsync(TestUri, TransferFormat.Binary).DefaultTimeout();
260260

src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsTransportTests.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public async Task CanStartStopSSETransport()
5252
using (var httpClient = new HttpClient(mockHttpHandler.Object))
5353
using (StartVerifiableLog())
5454
{
55-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
55+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
5656
await sseTransport.StartAsync(
5757
new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout();
5858

@@ -97,7 +97,7 @@ public async Task SSETransportStopsSendAndReceiveLoopsWhenTransportStopped()
9797
using (var httpClient = new HttpClient(mockHttpHandler.Object))
9898
using (StartVerifiableLog())
9999
{
100-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
100+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
101101

102102
Task transportActiveTask;
103103
try
@@ -150,7 +150,7 @@ public async Task SSETransportStopsWithErrorIfServerSendsIncompleteResults()
150150
using (var httpClient = new HttpClient(mockHttpHandler.Object))
151151
using (StartVerifiableLog())
152152
{
153-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
153+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
154154

155155
await sseTransport.StartAsync(
156156
new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout();
@@ -208,7 +208,7 @@ bool ExpectedErrors(WriteContext writeContext)
208208
using (var httpClient = new HttpClient(mockHttpHandler.Object))
209209
using (StartVerifiableLog(expectedErrorsFilter: ExpectedErrors))
210210
{
211-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
211+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
212212

213213
await sseTransport.StartAsync(
214214
new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout();
@@ -258,7 +258,7 @@ public async Task SSETransportStopsIfChannelClosed()
258258
using (var httpClient = new HttpClient(mockHttpHandler.Object))
259259
using (StartVerifiableLog())
260260
{
261-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
261+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
262262

263263
await sseTransport.StartAsync(
264264
new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout();
@@ -285,7 +285,7 @@ public async Task SSETransportStopsIfTheServerClosesTheStream()
285285
using (var httpClient = new HttpClient(mockHttpHandler.Object))
286286
using (StartVerifiableLog())
287287
{
288-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
288+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
289289

290290
await sseTransport.StartAsync(
291291
new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout();
@@ -339,7 +339,7 @@ public async Task SSETransportCancelsSendOnStop()
339339
using (var httpClient = new HttpClient(mockHttpHandler.Object))
340340
using (StartVerifiableLog())
341341
{
342-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
342+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
343343

344344
await sseTransport.StartAsync(
345345
new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout();
@@ -374,7 +374,7 @@ public async Task SSETransportDoesNotSupportBinary()
374374
using (var httpClient = new HttpClient(mockHttpHandler.Object))
375375
using (StartVerifiableLog())
376376
{
377-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
377+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
378378

379379
var ex = await Assert.ThrowsAsync<ArgumentException>(() => sseTransport.StartAsync(new Uri("http://fakeuri.org"), TransferFormat.Binary).DefaultTimeout());
380380

@@ -401,7 +401,7 @@ public async Task SSETransportThrowsForInvalidTransferFormat(TransferFormat tran
401401
using (var httpClient = new HttpClient(mockHttpHandler.Object))
402402
using (StartVerifiableLog())
403403
{
404-
var sseTransport = new ServerSentEventsTransport(httpClient, LoggerFactory);
404+
var sseTransport = new ServerSentEventsTransport(httpClient, loggerFactory: LoggerFactory);
405405
var exception = await Assert.ThrowsAsync<ArgumentException>(() =>
406406
sseTransport.StartAsync(new Uri("http://fakeuri.org"), transferFormat));
407407

src/SignalR/clients/csharp/Http.Connections.Client/src/HttpConnectionFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ internal static HttpConnectionOptions ShallowCopyHttpConnectionOptions(HttpConne
9292
AccessTokenProvider = options.AccessTokenProvider,
9393
CloseTimeout = options.CloseTimeout,
9494
DefaultTransferFormat = options.DefaultTransferFormat,
95+
ApplicationMaxBufferSize = options.ApplicationMaxBufferSize,
96+
TransportMaxBufferSize = options.TransportMaxBufferSize
9597
};
9698

9799
if (!OperatingSystem.IsBrowser())

src/SignalR/clients/csharp/Http.Connections.Client/src/HttpConnectionOptions.cs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.IO.Pipelines;
67
using System.Net;
78
using System.Net.Http;
89
using System.Net.WebSockets;
@@ -26,6 +27,15 @@ public class HttpConnectionOptions
2627
private IWebProxy? _proxy;
2728
private bool? _useDefaultCredentials;
2829
private Action<ClientWebSocketOptions>? _webSocketConfiguration;
30+
private PipeOptions? _transportPipeOptions;
31+
private PipeOptions? _appPipeOptions;
32+
private long _transportMaxBufferSize;
33+
private long _applicationMaxBufferSize;
34+
35+
// Selected because of the number of client connections is usually much lower than
36+
// server connections and therefore willing to use more memory. We'll default
37+
// to a maximum of 1MB buffer;
38+
private const int DefaultBufferSize = 1 * 1024 * 1024;
2939

3040
/// <summary>
3141
/// Initializes a new instance of the <see cref="HttpConnectionOptions"/> class.
@@ -43,6 +53,8 @@ public HttpConnectionOptions()
4353
_cookies = new CookieContainer();
4454

4555
Transports = HttpTransports.All;
56+
TransportMaxBufferSize = DefaultBufferSize;
57+
ApplicationMaxBufferSize = DefaultBufferSize;
4658
}
4759

4860
/// <summary>
@@ -66,6 +78,59 @@ public IDictionary<string, string> Headers
6678
set => _headers = value ?? throw new ArgumentNullException(nameof(value));
6779
}
6880

81+
/// <summary>
82+
/// Gets or sets the maximum buffer size for data read by the application before backpressure is applied.
83+
/// </summary>
84+
/// <remarks>
85+
/// The default value is 1MB.
86+
/// </remarks>
87+
/// <value>
88+
/// The default value is 1MB.
89+
/// </value>
90+
public long TransportMaxBufferSize
91+
{
92+
get => _transportMaxBufferSize;
93+
set
94+
{
95+
if (value < 0)
96+
{
97+
throw new ArgumentOutOfRangeException(nameof(value));
98+
}
99+
100+
_transportMaxBufferSize = value;
101+
}
102+
}
103+
104+
/// <summary>
105+
/// Gets or sets the maximum buffer size for data written by the application before backpressure is applied.
106+
/// </summary>
107+
/// <remarks>
108+
/// The default value is 1MB.
109+
/// </remarks>
110+
/// <value>
111+
/// The default value is 1MB.
112+
/// </value>
113+
public long ApplicationMaxBufferSize
114+
{
115+
get => _applicationMaxBufferSize;
116+
set
117+
{
118+
if (value < 0)
119+
{
120+
throw new ArgumentOutOfRangeException(nameof(value));
121+
}
122+
123+
_applicationMaxBufferSize = value;
124+
}
125+
}
126+
127+
// We initialize these lazily based on the state of the options specified here.
128+
// Though these are mutable it's extremely rare that they would be mutated past the
129+
// call to initialize the routerware.
130+
internal PipeOptions TransportPipeOptions => _transportPipeOptions ??= new PipeOptions(pauseWriterThreshold: TransportMaxBufferSize, resumeWriterThreshold: TransportMaxBufferSize / 2, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false);
131+
132+
internal PipeOptions AppPipeOptions => _appPipeOptions ??= new PipeOptions(pauseWriterThreshold: ApplicationMaxBufferSize, resumeWriterThreshold: ApplicationMaxBufferSize / 2, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false);
133+
69134
/// <summary>
70135
/// Gets or sets a collection of client certificates that will be sent with HTTP requests.
71136
/// </summary>

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/DefaultTransportFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ public ITransport CreateTransport(HttpTransportType availableServerTransports)
4949
if ((availableServerTransports & HttpTransportType.ServerSentEvents & _requestedTransportType) == HttpTransportType.ServerSentEvents)
5050
{
5151
// We don't need to give the transport the accessTokenProvider because the HttpClient has a message handler that does the work for us.
52-
return new ServerSentEventsTransport(_httpClient!, _loggerFactory);
52+
return new ServerSentEventsTransport(_httpClient!, _httpConnectionOptions, _loggerFactory);
5353
}
5454

5555
if ((availableServerTransports & HttpTransportType.LongPolling & _requestedTransportType) == HttpTransportType.LongPolling)
5656
{
5757
// We don't need to give the transport the accessTokenProvider because the HttpClient has a message handler that does the work for us.
58-
return new LongPollingTransport(_httpClient!, _loggerFactory);
58+
return new LongPollingTransport(_httpClient!, _httpConnectionOptions, _loggerFactory);
5959
}
6060

6161
throw new InvalidOperationException("No requested transports available on the server.");

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/LongPollingTransport.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ internal partial class LongPollingTransport : ITransport
1818
{
1919
private readonly HttpClient _httpClient;
2020
private readonly ILogger _logger;
21+
private readonly HttpConnectionOptions _httpConnectionOptions;
2122
private IDuplexPipe? _application;
2223
private IDuplexPipe? _transport;
2324
// Volatile so that the poll loop sees the updated value set from a different thread
@@ -31,14 +32,11 @@ internal partial class LongPollingTransport : ITransport
3132

3233
public PipeWriter Output => _transport!.Output;
3334

34-
public LongPollingTransport(HttpClient httpClient)
35-
: this(httpClient, null)
36-
{ }
37-
38-
public LongPollingTransport(HttpClient httpClient, ILoggerFactory? loggerFactory)
35+
public LongPollingTransport(HttpClient httpClient, HttpConnectionOptions? httpConnectionOptions = null, ILoggerFactory? loggerFactory = null)
3936
{
4037
_httpClient = httpClient;
4138
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
39+
_httpConnectionOptions = httpConnectionOptions ?? new();
4240
}
4341

4442
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
@@ -59,8 +57,7 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat, Cancellatio
5957
}
6058

6159
// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
62-
var options = ClientPipeOptions.DefaultOptions;
63-
var pair = DuplexPipe.CreateConnectionPair(options, options);
60+
var pair = DuplexPipe.CreateConnectionPair(_httpConnectionOptions.TransportPipeOptions, _httpConnectionOptions.AppPipeOptions);
6461

6562
_transport = pair.Transport;
6663
_application = pair.Application;

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ internal partial class ServerSentEventsTransport : ITransport
1818
{
1919
private readonly HttpClient _httpClient;
2020
private readonly ILogger _logger;
21-
21+
private readonly HttpConnectionOptions _httpConnectionOptions;
2222
// Volatile so that the SSE loop sees the updated value set from a different thread
2323
private volatile Exception? _error;
2424
private readonly CancellationTokenSource _transportCts = new CancellationTokenSource();
@@ -33,11 +33,7 @@ internal partial class ServerSentEventsTransport : ITransport
3333

3434
public PipeWriter Output => _transport!.Output;
3535

36-
public ServerSentEventsTransport(HttpClient httpClient)
37-
: this(httpClient, null)
38-
{ }
39-
40-
public ServerSentEventsTransport(HttpClient httpClient, ILoggerFactory? loggerFactory)
36+
public ServerSentEventsTransport(HttpClient httpClient, HttpConnectionOptions? httpConnectionOptions = null, ILoggerFactory? loggerFactory = null)
4137
{
4238
if (httpClient == null)
4339
{
@@ -46,6 +42,7 @@ public ServerSentEventsTransport(HttpClient httpClient, ILoggerFactory? loggerFa
4642

4743
_httpClient = httpClient;
4844
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
45+
_httpConnectionOptions = httpConnectionOptions ?? new();
4946
}
5047

5148
public async Task StartAsync(Uri url, TransferFormat transferFormat, CancellationToken cancellationToken = default)
@@ -77,8 +74,7 @@ public async Task StartAsync(Uri url, TransferFormat transferFormat, Cancellatio
7774
}
7875

7976
// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
80-
var options = ClientPipeOptions.DefaultOptions;
81-
var pair = DuplexPipe.CreateConnectionPair(options, options);
77+
var pair = DuplexPipe.CreateConnectionPair(_httpConnectionOptions.TransportPipeOptions, _httpConnectionOptions.AppPipeOptions);
8278

8379
_transport = pair.Transport;
8480
_application = pair.Application;

0 commit comments

Comments
 (0)