-
Notifications
You must be signed in to change notification settings - Fork 10.4k
Pool SocketSenders #30771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pool SocketSenders #30771
Changes from 5 commits
dc0b496
f4e75d2
1607af2
5067db4
11b8842
fe2d74c
704d9bc
b5aca36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,8 @@ internal sealed class SocketConnection : TransportConnection | |
private readonly Socket _socket; | ||
private readonly ISocketsTrace _trace; | ||
private readonly SocketReceiver _receiver; | ||
private readonly SocketSender _sender; | ||
private SocketSender? _sender; | ||
private readonly SocketSenderPool _socketSenderPool; | ||
private readonly IDuplexPipe _originalTransport; | ||
private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource(); | ||
|
||
|
@@ -36,6 +37,7 @@ internal SocketConnection(Socket socket, | |
MemoryPool<byte> memoryPool, | ||
PipeScheduler transportScheduler, | ||
ISocketsTrace trace, | ||
SocketSenderPool socketSenderPool, | ||
PipeOptions inputOptions, | ||
PipeOptions outputOptions, | ||
bool waitForData = true) | ||
|
@@ -48,6 +50,7 @@ internal SocketConnection(Socket socket, | |
MemoryPool = memoryPool; | ||
_trace = trace; | ||
_waitForData = waitForData; | ||
_socketSenderPool = socketSenderPool; | ||
|
||
LocalEndPoint = _socket.LocalEndPoint; | ||
RemoteEndPoint = _socket.RemoteEndPoint; | ||
|
@@ -59,8 +62,7 @@ internal SocketConnection(Socket socket, | |
// https://github.com/aspnet/KestrelHttpServer/issues/2573 | ||
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline; | ||
|
||
_receiver = new SocketReceiver(_socket, awaiterScheduler); | ||
_sender = new SocketSender(_socket, awaiterScheduler); | ||
_receiver = new SocketReceiver(awaiterScheduler); | ||
|
||
var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions); | ||
|
||
|
@@ -93,7 +95,7 @@ private async Task StartAsync() | |
await sendTask; | ||
|
||
_receiver.Dispose(); | ||
_sender.Dispose(); | ||
_sender?.Dispose(); | ||
} | ||
catch (Exception ex) | ||
{ | ||
|
@@ -183,13 +185,13 @@ private async Task ProcessReceives() | |
if (_waitForData) | ||
{ | ||
// Wait for data before allocating a buffer. | ||
await _receiver.WaitForDataAsync(); | ||
await _receiver.WaitForDataAsync(_socket); | ||
} | ||
|
||
// Ensure we have some reasonable amount of buffer space | ||
var buffer = input.GetMemory(MinAllocBufferSize); | ||
|
||
var bytesReceived = await _receiver.ReceiveAsync(buffer); | ||
var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer); | ||
|
||
if (bytesReceived == 0) | ||
{ | ||
|
@@ -282,7 +284,12 @@ private async Task ProcessSends() | |
var isCompleted = result.IsCompleted; | ||
if (!buffer.IsEmpty) | ||
{ | ||
await _sender.SendAsync(buffer); | ||
_sender = _socketSenderPool.Rent(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the perf impact of renting and returning senders to the pool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my tests, I haven't seen any impact on throughput. I ran plaintext though. I'm not sure of a case where this would show up being a real problem. The contention should be low as the queues are sharded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little confused why we need to rent and return for each send? Can't we rent once at the beginning of ProcessSends and return after (and calling Reset accordingly). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The entire purpose of the change is to avoid that. Today we have one per connection, this reduces it from one per connection to one per send operation, hence why we go from 5000 in the test below to 8. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. So this reduces allocations with a pool as well as reducing the amount of time an object is used for. If we have a limit of 1024, couldn't that be worse though when you get a sudden increase in traffic and then reduces? Is that something to be concerned about here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Realistically for that to happen, you would need to cause the TCP buffers in the kernel to fill up per connection by sending massive payloads and having the client not read them fast enough. The only way the number of concurrent operations exceeds the number of executing sends is when the sends themselves go async and this is very rare. That said, the SocketSender is about 300 bytes a pop and we default to a maximum of 16 IOQueues. If by some glitch we end up with 1024 entries per IOQueue then we'd have 4MB of unfreeable memory. |
||
await _sender.SendAsync(_socket, buffer); | ||
// We don't return to the pool if there was an exception, and | ||
// we keep the _sender assigned so that we can dispose it in StartAsync. | ||
_socketSenderPool.Return(_sender); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if SendAsync errors? Should the return be in a try/finally? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it errors, it's disposed (further up the stack where exceptions are caught). No it shouldn't bein a try/finally, we don't need to return things to the pool in the error case. |
||
_sender = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not make the _sender field a local var? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I can dispose the sender in the error case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need some comments explaining why. Is there a test for it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't mock the networking stack part of the sockets transport. There's just no need to reuse things in the face of exception, but I'll add a comment.
I don't see the value in making sure this gets disposed by trying to mock the SocketSender. We don't have tests making sure that the Socket itself is disposed. |
||
} | ||
|
||
output.AdvanceTo(end); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,55 +11,61 @@ | |
|
||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal | ||
{ | ||
internal sealed class SocketSender : SocketSenderReceiverBase | ||
internal sealed class SocketSender : SocketAwaitableEventArgs | ||
{ | ||
private List<ArraySegment<byte>>? _bufferList; | ||
|
||
public SocketSender(Socket socket, PipeScheduler scheduler) : base(socket, scheduler) | ||
public SocketSender(PipeScheduler scheduler) : base(scheduler) | ||
{ | ||
} | ||
|
||
public SocketAwaitableEventArgs SendAsync(in ReadOnlySequence<byte> buffers) | ||
public SocketAwaitableEventArgs SendAsync(Socket socket, in ReadOnlySequence<byte> buffers) | ||
{ | ||
if (buffers.IsSingleSegment) | ||
{ | ||
return SendAsync(buffers.First); | ||
return SendAsync(socket, buffers.First); | ||
} | ||
|
||
if (!_awaitableEventArgs.MemoryBuffer.Equals(Memory<byte>.Empty)) | ||
{ | ||
_awaitableEventArgs.SetBuffer(null, 0, 0); | ||
} | ||
|
||
_awaitableEventArgs.BufferList = GetBufferList(buffers); | ||
SetBufferList(buffers); | ||
|
||
if (!_socket.SendAsync(_awaitableEventArgs)) | ||
if (!socket.SendAsync(this)) | ||
{ | ||
_awaitableEventArgs.Complete(); | ||
Complete(); | ||
} | ||
|
||
return _awaitableEventArgs; | ||
return this; | ||
} | ||
|
||
private SocketAwaitableEventArgs SendAsync(ReadOnlyMemory<byte> memory) | ||
public void Reset() | ||
{ | ||
// The BufferList getter is much less expensive then the setter. | ||
if (_awaitableEventArgs.BufferList != null) | ||
// We clear the buffer and buffer list before we put it back into the pool | ||
// it's a small performance hit but it removes the confusion when looking at dumps to see this still | ||
// holds onto the buffer when it's back in the pool | ||
if (BufferList != null) | ||
{ | ||
BufferList = null; | ||
} | ||
else | ||
{ | ||
_awaitableEventArgs.BufferList = null; | ||
SetBuffer(null, 0, 0); | ||
} | ||
|
||
_awaitableEventArgs.SetBuffer(MemoryMarshal.AsMemory(memory)); | ||
_bufferList?.Clear(); | ||
davidfowl marked this conversation as resolved.
Show resolved
Hide resolved
davidfowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
private SocketAwaitableEventArgs SendAsync(Socket socket, ReadOnlyMemory<byte> memory) | ||
{ | ||
SetBuffer(MemoryMarshal.AsMemory(memory)); | ||
|
||
if (!_socket.SendAsync(_awaitableEventArgs)) | ||
if (!socket.SendAsync(this)) | ||
{ | ||
_awaitableEventArgs.Complete(); | ||
Complete(); | ||
} | ||
|
||
return _awaitableEventArgs; | ||
return this; | ||
} | ||
|
||
private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer) | ||
private void SetBufferList(in ReadOnlySequence<byte> buffer) | ||
{ | ||
Debug.Assert(!buffer.IsEmpty); | ||
Debug.Assert(!buffer.IsSingleSegment); | ||
|
@@ -68,18 +74,14 @@ private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer) | |
{ | ||
_bufferList = new List<ArraySegment<byte>>(); | ||
} | ||
else | ||
{ | ||
// Buffers are pooled, so it's OK to root them until the next multi-buffer write. | ||
_bufferList.Clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How much memory is expected to be used in the bufferlist? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you asking how big the list will get? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we aren't clearing here, how much idle memory will be in the bufferlist? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We clear in Return. |
||
} | ||
|
||
foreach (var b in buffer) | ||
{ | ||
_bufferList.Add(b.GetArray()); | ||
} | ||
|
||
return _bufferList; | ||
// The act of setting this list, sets the buffers in the internal buffer list | ||
BufferList = _bufferList; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,76 @@ | ||||||||
// Copyright (c) .NET Foundation. All rights reserved. | ||||||||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||||||||
|
||||||||
using System; | ||||||||
BrennanConroy marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
using System.Collections.Concurrent; | ||||||||
using System.IO.Pipelines; | ||||||||
using System.Threading; | ||||||||
|
||||||||
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal | ||||||||
{ | ||||||||
internal class SocketSenderPool : IDisposable | ||||||||
{ | ||||||||
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough? | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you look to see how many were created with 5000 websockets? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, 8 on my machine (8 cores). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1024 seems high to me. Is this the maximum number of sends occurring at the same time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What number do you like? A multiple of the number of cores? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, this is a maximum, it doesn't mean we start with this many or that we'll have this many. It means we won't got past this many. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't want to be strict. I'll pick whatever number ya'll want though as long as we don't introduce locking into this code path. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can be strict without locking. You can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I've complicated the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, there's no way that I can think of to make this accurate without locking. Review the code and let me know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
private readonly ConcurrentQueue<SocketSender> _queue = new(); | ||||||||
private int _count; | ||||||||
private readonly PipeScheduler _scheduler; | ||||||||
private bool _disposed; | ||||||||
|
||||||||
public SocketSenderPool(PipeScheduler scheduler) | ||||||||
{ | ||||||||
_scheduler = scheduler; | ||||||||
} | ||||||||
|
||||||||
public SocketSender Rent() | ||||||||
{ | ||||||||
if (_queue.TryDequeue(out var sender)) | ||||||||
{ | ||||||||
Interlocked.Decrement(ref _count); | ||||||||
return sender; | ||||||||
} | ||||||||
return new SocketSender(_scheduler); | ||||||||
} | ||||||||
|
||||||||
public void Return(SocketSender sender) | ||||||||
{ | ||||||||
if (Volatile.Read(ref _disposed)) | ||||||||
davidfowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
{ | ||||||||
// We disposed the queue | ||||||||
davidfowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
sender.Dispose(); | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
// Add this sender back to the queue if we haven't crossed the max | ||||||||
var count = Volatile.Read(ref _count); | ||||||||
davidfowl marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
while (count < MaxQueueSize) | ||||||||
{ | ||||||||
var prev = Interlocked.CompareExchange(ref _count, count + 1, count); | ||||||||
|
||||||||
if (prev == count) | ||||||||
{ | ||||||||
sender.Reset(); | ||||||||
_queue.Enqueue(sender); | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
count = prev; | ||||||||
} | ||||||||
|
||||||||
// Over the limit | ||||||||
sender.Dispose(); | ||||||||
} | ||||||||
|
||||||||
public void Dispose() | ||||||||
{ | ||||||||
if (!_disposed) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is thread safety an issue when disposing? What happens if there is a sender that hasn't been returned to the queue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not a concern. It'll probably leak. We have that same issue with the memory pool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify, in the face of ungraceful shutdown, bad things can happen with both the memory pool and now this pool. |
||||||||
{ | ||||||||
_disposed = true; | ||||||||
while (_queue.TryDequeue(out var sender)) | ||||||||
{ | ||||||||
sender.Dispose(); | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
} |
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.