|
7 | 7 | using System.Net.Quic;
|
8 | 8 | using System.Text;
|
9 | 9 | using System.Threading.Tasks;
|
| 10 | +using Microsoft.AspNetCore.Connections; |
10 | 11 | using Microsoft.AspNetCore.Connections.Features;
|
11 | 12 | using Microsoft.AspNetCore.Server.Kestrel.FunctionalTests;
|
12 | 13 | using Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal;
|
@@ -223,6 +224,94 @@ public async Task StreamPool_Heartbeat_ExpiredStreamRemoved()
|
223 | 224 | Assert.Equal(0, quicConnectionContext.StreamPool.Count);
|
224 | 225 | }
|
225 | 226 |
|
| 227 | + [ConditionalFact] |
| 228 | + [MsQuicSupported] |
| 229 | + public async Task StreamPool_ManyConcurrentStreams_StreamPoolFull() |
| 230 | + { |
| 231 | + // Arrange |
| 232 | + await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory); |
| 233 | + |
| 234 | + var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint); |
| 235 | + using var clientConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options); |
| 236 | + await clientConnection.ConnectAsync().DefaultTimeout(); |
| 237 | + |
| 238 | + await using var serverConnection = await connectionListener.AcceptAsync().DefaultTimeout(); |
| 239 | + |
| 240 | + var testHeartbeatFeature = new TestHeartbeatFeature(); |
| 241 | + serverConnection.Features.Set<IConnectionHeartbeatFeature>(testHeartbeatFeature); |
| 242 | + |
| 243 | + // Act |
| 244 | + var quicConnectionContext = Assert.IsType<QuicConnectionContext>(serverConnection); |
| 245 | + Assert.Equal(0, quicConnectionContext.StreamPool.Count); |
| 246 | + |
| 247 | + var pauseCompleteTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); |
| 248 | + var allConnectionsOnServerTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); |
| 249 | + var streamTasks = new List<Task>(); |
| 250 | + var requestState = new RequestState(clientConnection, serverConnection, allConnectionsOnServerTcs, pauseCompleteTcs.Task); |
| 251 | + |
| 252 | + const int StreamsSent = 101; |
| 253 | + for (var i = 0; i < StreamsSent; i++) |
| 254 | + { |
| 255 | + // TODO: Race condition in QUIC library. |
| 256 | + // Delay between sending streams to avoid |
| 257 | + // https://github.com/dotnet/runtime/issues/55249 |
| 258 | + await Task.Delay(50); |
| 259 | + streamTasks.Add(SendStream(requestState)); |
| 260 | + } |
| 261 | + |
| 262 | + await allConnectionsOnServerTcs.Task.DefaultTimeout(); |
| 263 | + pauseCompleteTcs.SetResult(); |
| 264 | + |
| 265 | + await Task.WhenAll(streamTasks).DefaultTimeout(); |
| 266 | + |
| 267 | + // Assert |
| 268 | + // Up to 100 streams are pooled. |
| 269 | + Assert.Equal(100, quicConnectionContext.StreamPool.Count); |
| 270 | + |
| 271 | + static async Task SendStream(RequestState requestState) |
| 272 | + { |
| 273 | + var clientStream = requestState.QuicConnection.OpenBidirectionalStream(); |
| 274 | + await clientStream.WriteAsync(TestData, endStream: true).DefaultTimeout(); |
| 275 | + var serverStream = await requestState.ServerConnection.AcceptAsync().DefaultTimeout(); |
| 276 | + var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout(); |
| 277 | + serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End); |
| 278 | + |
| 279 | + // Input should be completed. |
| 280 | + readResult = await serverStream.Transport.Input.ReadAsync(); |
| 281 | + Assert.True(readResult.IsCompleted); |
| 282 | + |
| 283 | + lock (requestState) |
| 284 | + { |
| 285 | + requestState.ActiveConcurrentConnections++; |
| 286 | + if (requestState.ActiveConcurrentConnections == StreamsSent) |
| 287 | + { |
| 288 | + requestState.AllConnectionsOnServerTcs.SetResult(); |
| 289 | + } |
| 290 | + } |
| 291 | + |
| 292 | + await requestState.PauseCompleteTask; |
| 293 | + |
| 294 | + // Complete reading and writing. |
| 295 | + await serverStream.Transport.Input.CompleteAsync(); |
| 296 | + await serverStream.Transport.Output.CompleteAsync(); |
| 297 | + |
| 298 | + var quicStreamContext = Assert.IsType<QuicStreamContext>(serverStream); |
| 299 | + |
| 300 | + // Both send and receive loops have exited. |
| 301 | + await quicStreamContext._processingTask.DefaultTimeout(); |
| 302 | + await quicStreamContext.DisposeAsync(); |
| 303 | + } |
| 304 | + } |
| 305 | + |
| 306 | + private record RequestState( |
| 307 | + QuicConnection QuicConnection, |
| 308 | + MultiplexedConnectionContext ServerConnection, |
| 309 | + TaskCompletionSource AllConnectionsOnServerTcs, |
| 310 | + Task PauseCompleteTask) |
| 311 | + { |
| 312 | + public int ActiveConcurrentConnections { get; set; } |
| 313 | + }; |
| 314 | + |
226 | 315 | private class TestSystemClock : ISystemClock
|
227 | 316 | {
|
228 | 317 | public DateTimeOffset UtcNow { get; set; }
|
|
0 commit comments