-
Notifications
You must be signed in to change notification settings - Fork 10.4k
Pool SocketSenders #30771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pool SocketSenders #30771
Changes from 1 commit
dc0b496
f4e75d2
1607af2
5067db4
11b8842
fe2d74c
704d9bc
b5aca36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,8 @@ internal sealed class SocketConnection : TransportConnection | |
private readonly Socket _socket; | ||
private readonly ISocketsTrace _trace; | ||
private readonly SocketReceiver _receiver; | ||
private readonly SocketSender _sender; | ||
private SocketSender? _sender; | ||
private readonly SocketSenderPool _socketSenderPool; | ||
private readonly IDuplexPipe _originalTransport; | ||
private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource(); | ||
|
||
|
@@ -36,6 +37,7 @@ internal SocketConnection(Socket socket, | |
MemoryPool<byte> memoryPool, | ||
PipeScheduler transportScheduler, | ||
ISocketsTrace trace, | ||
SocketSenderPool socketSenderPool, | ||
PipeOptions inputOptions, | ||
PipeOptions outputOptions, | ||
bool waitForData = true) | ||
|
@@ -48,8 +50,9 @@ internal SocketConnection(Socket socket, | |
MemoryPool = memoryPool; | ||
_trace = trace; | ||
_waitForData = waitForData; | ||
_socketSenderPool = socketSenderPool; | ||
|
||
LocalEndPoint = _socket.LocalEndPoint; | ||
LocalEndPoint = _socket.LocalEndPoint; | ||
davidfowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
RemoteEndPoint = _socket.RemoteEndPoint; | ||
|
||
ConnectionClosed = _connectionClosedTokenSource.Token; | ||
|
@@ -59,8 +62,7 @@ internal SocketConnection(Socket socket, | |
// https://github.com/aspnet/KestrelHttpServer/issues/2573 | ||
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline; | ||
|
||
_receiver = new SocketReceiver(_socket, awaiterScheduler); | ||
_sender = new SocketSender(_socket, awaiterScheduler); | ||
_receiver = new SocketReceiver(awaiterScheduler); | ||
|
||
var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions); | ||
|
||
|
@@ -93,7 +95,7 @@ private async Task StartAsync() | |
await sendTask; | ||
|
||
_receiver.Dispose(); | ||
_sender.Dispose(); | ||
_sender?.Dispose(); | ||
} | ||
catch (Exception ex) | ||
{ | ||
|
@@ -183,13 +185,13 @@ private async Task ProcessReceives() | |
if (_waitForData) | ||
{ | ||
// Wait for data before allocating a buffer. | ||
await _receiver.WaitForDataAsync(); | ||
await _receiver.WaitForDataAsync(_socket); | ||
} | ||
|
||
// Ensure we have some reasonable amount of buffer space | ||
var buffer = input.GetMemory(MinAllocBufferSize); | ||
|
||
var bytesReceived = await _receiver.ReceiveAsync(buffer); | ||
var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer); | ||
|
||
if (bytesReceived == 0) | ||
{ | ||
|
@@ -282,7 +284,10 @@ private async Task ProcessSends() | |
var isCompleted = result.IsCompleted; | ||
if (!buffer.IsEmpty) | ||
{ | ||
await _sender.SendAsync(buffer); | ||
_sender = _socketSenderPool.Rent(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the perf impact of renting and returning senders to the pool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my tests, I haven't seen any impact on throughput. I ran plaintext though. I'm not sure of a case where this would show up being a real problem. The contention should be low as the queues are sharded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little confused why we need to rent and return for each send? Can't we rent once at the beginning of ProcessSends and return after (and calling Reset accordingly). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The entire purpose of the change is to avoid that. Today we have one per connection, this reduces it from one per connection to one per send operation, hence why we go from 5000 in the test below to 8. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. So this reduces allocations with a pool as well as reducing the amount of time an object is used for. If we have a limit of 1024, couldn't that be worse though when you get a sudden increase in traffic and then reduces? Is that something to be concerned about here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Realistically for that to happen, you would need to cause the TCP buffers in the kernel to fill up per connection by sending massive payloads and having the client not read them fast enough. The only way the number of concurrent operations exceeds the number of executing sends is when the sends themselves go async and this is very rare. That said, the SocketSender is about 300 bytes a pop and we default to a maximum of 16 IOQueues. If by some glitch we end up with 1024 entries per IOQueue then we'd have 4MB of unfreeable memory. |
||
await _sender.SendAsync(_socket, buffer); | ||
_socketSenderPool.Return(_sender); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if SendAsync errors? Should the return be in a try/finally? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it errors, it's disposed (further up the stack where exceptions are caught). No it shouldn't bein a try/finally, we don't need to return things to the pool in the error case. |
||
_sender = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not make the _sender field a local var? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I can dispose the sender in the error case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need some comments explaining why. Is there a test for it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't mock the networking stack part of the sockets transport. There's just no need to reuse things in the face of exception, but I'll add a comment.
I don't see the value in making sure this gets disposed by trying to mock the SocketSender. We don't have tests making sure that the Socket itself is disposed. |
||
} | ||
|
||
output.AdvanceTo(end); | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,55 @@ | ||||||||
using System; | ||||||||
BrennanConroy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
using System.Collections.Concurrent; | ||||||||
using System.IO.Pipelines; | ||||||||
|
||||||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal | ||||||||
{ | ||||||||
internal class SocketSenderPool : IDisposable | ||||||||
{ | ||||||||
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough? | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you look to see how many were created with 5000 websockets? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, 8 on my machine (8 cores). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1024 seems high to me. Is this the maximum number of sends occurring at the same time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What number do you like? A multiple of the number of cores? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, this is a maximum, it doesn't mean we start with this many or that we'll have this many. It means we won't got past this many. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't want to be strict. I'll pick whatever number ya'll want though as long as we don't introduce locking into this code path. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can be strict without locking. You can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I've complicated the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, there's no way that I can think of to make this accurate without locking. Review the code and let me know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
private readonly ConcurrentQueue<SocketSender> _queue = new(); | ||||||||
private readonly PipeScheduler _scheduler; | ||||||||
private bool _disposed; | ||||||||
|
||||||||
public SocketSenderPool(PipeScheduler scheduler) | ||||||||
{ | ||||||||
_scheduler = scheduler; | ||||||||
} | ||||||||
|
||||||||
public SocketSender Rent() | ||||||||
{ | ||||||||
if (_queue.TryDequeue(out var sender)) | ||||||||
{ | ||||||||
return sender; | ||||||||
} | ||||||||
return new SocketSender(_scheduler); | ||||||||
} | ||||||||
|
||||||||
public void Return(SocketSender sender) | ||||||||
{ | ||||||||
if (_disposed || _queue.Count > MaxQueueSize) | ||||||||
davidfowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
{ | ||||||||
sender.Dispose(); | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
sender.Reset(); | ||||||||
|
||||||||
_queue.Enqueue(sender); | ||||||||
} | ||||||||
|
||||||||
public void Dispose() | ||||||||
{ | ||||||||
if (!_disposed) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is thread safety an issue when disposing? What happens if there is a sender that hasn't been returned to the queue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not a concern. It'll probably leak. We have that same issue with the memory pool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify, in the face of ungraceful shutdown, bad things can happen with both the memory pool and now this pool. |
||||||||
{ | ||||||||
_disposed = true; | ||||||||
while (_queue.TryDequeue(out var sender)) | ||||||||
{ | ||||||||
sender.Dispose(); | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
davidfowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
} | ||||||||
} |
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.