Skip to content

Commit 1a821c6

Browse files
committed
Consolidate server and transport shutdown logic
1 parent 47fe637 commit 1a821c6

File tree

7 files changed

+75
-110
lines changed

7 files changed

+75
-110
lines changed

src/Servers/Kestrel/Core/src/Internal/Infrastructure/ConnectionManager.cs

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -74,62 +74,6 @@ public void Walk(Action<KestrelConnection> callback)
7474
}
7575
}
7676

77-
public Task<bool> CloseAllConnectionsAsync(CancellationToken token)
78-
{
79-
return CloseAllConnectionsAsync(_connectionReferences, token);
80-
}
81-
82-
public Task<bool> AbortAllConnectionsAsync()
83-
{
84-
return AbortAllConnectionsAsync(_connectionReferences);
85-
}
86-
87-
internal static async Task<bool> CloseAllConnectionsAsync(ConcurrentDictionary<long, ConnectionReference> connectionReferences, CancellationToken token)
88-
{
89-
var closeTasks = new List<Task>();
90-
91-
foreach (var kvp in connectionReferences)
92-
{
93-
if (kvp.Value.TryGetConnection(out var connection))
94-
{
95-
connection.RequestClose();
96-
closeTasks.Add(connection.ExecutionTask);
97-
}
98-
}
99-
100-
var allClosedTask = Task.WhenAll(closeTasks.ToArray());
101-
return await Task.WhenAny(allClosedTask, CancellationTokenAsTask(token)).ConfigureAwait(false) == allClosedTask;
102-
}
103-
104-
internal static async Task<bool> AbortAllConnectionsAsync(ConcurrentDictionary<long, ConnectionReference> connectionReferences)
105-
{
106-
var abortTasks = new List<Task>();
107-
108-
foreach (var kvp in connectionReferences)
109-
{
110-
if (kvp.Value.TryGetConnection(out var connection))
111-
{
112-
connection.TransportConnection.Abort(new ConnectionAbortedException(CoreStrings.ConnectionAbortedDuringServerShutdown));
113-
abortTasks.Add(connection.ExecutionTask);
114-
}
115-
}
116-
117-
var allAbortedTask = Task.WhenAll(abortTasks.ToArray());
118-
return await Task.WhenAny(allAbortedTask, Task.Delay(1000)).ConfigureAwait(false) == allAbortedTask;
119-
}
120-
121-
private static Task CancellationTokenAsTask(CancellationToken token)
122-
{
123-
if (token.IsCancellationRequested)
124-
{
125-
return Task.CompletedTask;
126-
}
127-
128-
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
129-
token.Register(() => tcs.SetResult(null));
130-
return tcs.Task;
131-
}
132-
13377
private static ResourceCounter GetCounter(long? number)
13478
=> number.HasValue
13579
? ResourceCounter.Quota(number.Value)

src/Servers/Kestrel/Core/src/Internal/Infrastructure/TransportConnectionManager.cs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
using System;
77
using System.Collections.Concurrent;
8+
using System.Collections.Generic;
89
using System.Threading;
910
using System.Threading.Tasks;
11+
using Microsoft.AspNetCore.Connections;
1012

1113
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
1214
{
@@ -52,14 +54,50 @@ public void StopTracking(long id)
5254
}
5355
}
5456

55-
public Task<bool> CloseAllConnectionsAsync(CancellationToken token)
57+
public async Task<bool> CloseAllConnectionsAsync(CancellationToken token)
5658
{
57-
return ConnectionManager.CloseAllConnectionsAsync(_connectionReferences, token);
59+
var closeTasks = new List<Task>();
60+
61+
foreach (var kvp in _connectionReferences)
62+
{
63+
if (kvp.Value.TryGetConnection(out var connection))
64+
{
65+
connection.RequestClose();
66+
closeTasks.Add(connection.ExecutionTask);
67+
}
68+
}
69+
70+
var allClosedTask = Task.WhenAll(closeTasks.ToArray());
71+
return await Task.WhenAny(allClosedTask, CancellationTokenAsTask(token)).ConfigureAwait(false) == allClosedTask;
72+
}
73+
74+
public async Task<bool> AbortAllConnectionsAsync()
75+
{
76+
var abortTasks = new List<Task>();
77+
78+
foreach (var kvp in _connectionReferences)
79+
{
80+
if (kvp.Value.TryGetConnection(out var connection))
81+
{
82+
connection.TransportConnection.Abort(new ConnectionAbortedException(CoreStrings.ConnectionAbortedDuringServerShutdown));
83+
abortTasks.Add(connection.ExecutionTask);
84+
}
85+
}
86+
87+
var allAbortedTask = Task.WhenAll(abortTasks.ToArray());
88+
return await Task.WhenAny(allAbortedTask, Task.Delay(1000)).ConfigureAwait(false) == allAbortedTask;
5889
}
5990

60-
public Task<bool> AbortAllConnectionsAsync()
91+
private static Task CancellationTokenAsTask(CancellationToken token)
6192
{
62-
return ConnectionManager.AbortAllConnectionsAsync(_connectionReferences);
93+
if (token.IsCancellationRequested)
94+
{
95+
return Task.CompletedTask;
96+
}
97+
98+
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
99+
token.Register(() => tcs.SetResult(null!));
100+
return tcs.Task;
63101
}
64102
}
65103
}

src/Servers/Kestrel/Core/src/Internal/Infrastructure/TransportManager.cs

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,19 @@ private void StartAcceptLoop<T>(IConnectionListener<T> connectionListener, Func<
6767
_transports.Add(new ActiveTransport(connectionListener, acceptLoopTask, transportConnectionManager, endpointConfig));
6868
}
6969

70-
public async Task StopEndpointsAsync(List<EndpointConfig> endpointsToStop, CancellationToken cancellationToken)
70+
public Task StopEndpointsAsync(List<EndpointConfig> endpointsToStop, CancellationToken cancellationToken)
7171
{
7272
var transportsToStop = _transports.Where(t => t.EndpointConfig != null && endpointsToStop.Contains(t.EndpointConfig)).ToList();
73+
return StopTransportsAsync(transportsToStop, cancellationToken);
74+
}
75+
76+
public Task StopAsync(CancellationToken cancellationToken)
77+
{
78+
return StopTransportsAsync(new List<ActiveTransport>(_transports), cancellationToken);
79+
}
7380

81+
private async Task StopTransportsAsync(List<ActiveTransport> transportsToStop, CancellationToken cancellationToken)
82+
{
7483
var tasks = new Task[transportsToStop.Count];
7584

7685
for (int i = 0; i < transportsToStop.Count; i++)
@@ -113,37 +122,6 @@ async Task StopTransportConnection(ActiveTransport transport)
113122
}
114123
}
115124

116-
public async Task StopAsync(CancellationToken cancellationToken)
117-
{
118-
var tasks = new Task[_transports.Count];
119-
120-
for (int i = 0; i < _transports.Count; i++)
121-
{
122-
tasks[i] = _transports[i].UnbindAsync(cancellationToken);
123-
}
124-
125-
await Task.WhenAll(tasks).ConfigureAwait(false);
126-
127-
if (!await ConnectionManager.CloseAllConnectionsAsync(cancellationToken).ConfigureAwait(false))
128-
{
129-
Trace.NotAllConnectionsClosedGracefully();
130-
131-
if (!await ConnectionManager.AbortAllConnectionsAsync().ConfigureAwait(false))
132-
{
133-
Trace.NotAllConnectionsAborted();
134-
}
135-
}
136-
137-
for (int i = 0; i < _transports.Count; i++)
138-
{
139-
tasks[i] = _transports[i].DisposeAsync().AsTask();
140-
}
141-
142-
await Task.WhenAll(tasks).ConfigureAwait(false);
143-
144-
_transports.Clear();
145-
}
146-
147125
private class ActiveTransport : IAsyncDisposable
148126
{
149127
public ActiveTransport(IConnectionListenerBase transport, Task acceptLoopTask, TransportConnectionManager transportConnectionManager, EndpointConfig? endpointConfig = null)
@@ -163,7 +141,6 @@ public ActiveTransport(IConnectionListenerBase transport, Task acceptLoopTask, T
163141
public async Task UnbindAsync(CancellationToken cancellationToken)
164142
{
165143
await ConnectionListener.UnbindAsync(cancellationToken).ConfigureAwait(false);
166-
// TODO: Stop awaiting this if cancellationToken fires
167144
await AcceptLoopTask.ConfigureAwait(false);
168145
}
169146

src/Servers/Kestrel/Core/src/KestrelConfigurationLoader.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,18 +244,18 @@ public void Load()
244244
// Any endpoints that were removed from the last time endpoints were loaded are returned.
245245
internal (List<ListenOptions>, List<ListenOptions>) Reload()
246246
{
247+
var endpointsToStop = Options.ConfigurationBackedListenOptions.ToList();
248+
var endpointsToStart = new List<ListenOptions>();
249+
250+
Options.ConfigurationBackedListenOptions.Clear();
251+
DefaultCertificateConfig = null;
252+
247253
ConfigurationReader = new ConfigurationReader(Configuration);
248254

249255
Options.Latin1RequestHeaders = ConfigurationReader.Latin1RequestHeaders;
250256

251-
DefaultCertificateConfig = null;
252257
LoadDefaultCert(ConfigurationReader);
253258

254-
var endpointsToStop = Options.ConfigurationBackedListenOptions.ToList();
255-
var endpointsToStart = new List<ListenOptions>();
256-
257-
Options.ConfigurationBackedListenOptions.Clear();
258-
259259
foreach (var endpoint in ConfigurationReader.Endpoints)
260260
{
261261
var listenOptions = AddressBinder.ParseAddress(endpoint.Url, out var https);

src/Servers/Kestrel/Core/src/KestrelServer.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core
2323
public class KestrelServer : IServer
2424
{
2525
private readonly IServerAddressesFeature _serverAddresses;
26-
private readonly List<IConnectionListenerFactory> _transportFactories;
27-
private readonly List<IMultiplexedConnectionListenerFactory> _multiplexedTransportFactories;
2826
private readonly TransportManager _transportManager;
27+
private readonly IConnectionListenerFactory _transportFactory;
28+
private readonly IMultiplexedConnectionListenerFactory _multiplexedTransportFactory;
2929

3030
private readonly SemaphoreSlim _bindSemaphore = new SemaphoreSlim(initialCount: 1);
3131
private bool _hasStarted;
@@ -59,10 +59,10 @@ internal KestrelServer(IEnumerable<IConnectionListenerFactory> transportFactorie
5959
throw new ArgumentNullException(nameof(transportFactories));
6060
}
6161

62-
_transportFactories = transportFactories.ToList();
63-
_multiplexedTransportFactories = multiplexedFactories?.ToList();
62+
_transportFactory = transportFactories?.LastOrDefault();
63+
_multiplexedTransportFactory = multiplexedFactories?.LastOrDefault();
6464

65-
if (_transportFactories.Count == 0 && (_multiplexedTransportFactories == null || _multiplexedTransportFactories.Count == 0))
65+
if (_transportFactory == null && _multiplexedTransportFactory == null)
6666
{
6767
throw new InvalidOperationException(CoreStrings.TransportNotFound);
6868
}
@@ -73,7 +73,7 @@ internal KestrelServer(IEnumerable<IConnectionListenerFactory> transportFactorie
7373
_serverAddresses = new ServerAddressesFeature();
7474
Features.Set(_serverAddresses);
7575

76-
_transportManager = new TransportManager(transportFactories?.LastOrDefault(), multiplexedFactories?.LastOrDefault(), ServiceContext);
76+
_transportManager = new TransportManager(_transportFactory, _multiplexedTransportFactory, ServiceContext);
7777

7878
HttpCharacters.Initialize();
7979
}
@@ -154,7 +154,7 @@ async Task OnBind(ListenOptions options)
154154
// sockets for it to successfully listen. It also seems racy.
155155
if ((options.Protocols & HttpProtocols.Http3) == HttpProtocols.Http3)
156156
{
157-
if (_multiplexedTransportFactories == null || _multiplexedTransportFactories.Count == 0)
157+
if (_multiplexedTransportFactory is null)
158158
{
159159
throw new InvalidOperationException($"Cannot start HTTP/3 server if no {nameof(IMultiplexedConnectionListenerFactory)} is registered.");
160160
}
@@ -171,6 +171,11 @@ async Task OnBind(ListenOptions options)
171171
|| options.Protocols == HttpProtocols.None) // TODO a test fails because it doesn't throw an exception in the right place
172172
// when there is no HttpProtocols in KestrelServer, can we remove/change the test?
173173
{
174+
if (_transportFactory is null)
175+
{
176+
throw new InvalidOperationException($"Cannot start HTTP/1.x or HTTP/2 server if no {nameof(IConnectionListenerFactory)} is registered.");
177+
}
178+
174179
options.UseHttpServer(ServiceContext, application, options.Protocols);
175180
var connectionDelegate = options.Build();
176181

src/Servers/Kestrel/Core/src/KestrelServerOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class KestrelServerOptions
2424
{
2525
// The following two lists configure the endpoints that Kestrel should listen toIf both lists is empty, the server.urls setting (e.g. UseUrls) is used.
2626
internal List<ListenOptions> CodeBackedListenOptions { get; } = new List<ListenOptions>();
27-
internal List<ListenOptions> ConfigurationBackedListenOptions { get; set; } = new List<ListenOptions>();
27+
internal List<ListenOptions> ConfigurationBackedListenOptions { get; } = new List<ListenOptions>();
2828
internal IEnumerable<ListenOptions> ListenOptions => CodeBackedListenOptions.Concat(ConfigurationBackedListenOptions);
2929

3030
// For testing and debugging.

src/Servers/Kestrel/Transport.Libuv/test/LibuvTransportTests.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,8 @@ public async Task OneToTenThreads(int threadCount)
198198
await transport.BindAsync();
199199
listenOptions.EndPoint = transport.EndPoint;
200200

201-
var dispatcher = new ConnectionDispatcher<ConnectionContext>(serviceContext, c => listenOptions.Build()(c), new TransportConnectionManager(serviceContext.ConnectionManager));
201+
var transportConnectionManager = new TransportConnectionManager(serviceContext.ConnectionManager);
202+
var dispatcher = new ConnectionDispatcher<ConnectionContext>(serviceContext, c => listenOptions.Build()(c), transportConnectionManager);
202203
var acceptTask = dispatcher.StartAcceptingConnections(new GenericConnectionListener(transport));
203204

204205
using (var client = new HttpClient())
@@ -221,9 +222,9 @@ public async Task OneToTenThreads(int threadCount)
221222

222223
await acceptTask;
223224

224-
if (!await serviceContext.ConnectionManager.CloseAllConnectionsAsync(default))
225+
if (!await transportConnectionManager.CloseAllConnectionsAsync(default))
225226
{
226-
await serviceContext.ConnectionManager.AbortAllConnectionsAsync();
227+
await transportConnectionManager.AbortAllConnectionsAsync();
227228
}
228229
}
229230

0 commit comments

Comments
 (0)