Skip to content

Commit e2acbb9

Browse files
authored
Delay socket receive/send until first read/flush (#34458)
* Delay socket receive/send until first read/flush - Today when the socket connection is accepted or connection, we implicitly start reading and writing from the socket. This can prevent certain scenarios where users want to get access to the raw socket before any operations happen (like in hand off scenarios). This change defers the reads and writes until read or flush is called on the transport's IDuplexPipe. - Added test to make sure we can read from the socket via IConnectionocket feature. - Added DuplicateAndClose test
1 parent 1e52c74 commit e2acbb9

File tree

7 files changed

+347
-15
lines changed

7 files changed

+347
-15
lines changed

src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, Cancel
8181
_outputOptions,
8282
_options.WaitForDataBeforeAllocatingBuffer);
8383

84-
socketConnection.Start();
8584
return socketConnection;
8685
}
8786

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.IO.Pipelines;
5+
6+
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
7+
{
8+
internal sealed partial class SocketConnection
9+
{
10+
// We could implement this on SocketConnection to remove an extra allocation but this is a
11+
// bit cleaner
12+
private class SocketDuplexPipe : IDuplexPipe
13+
{
14+
public SocketDuplexPipe(SocketConnection connection)
15+
{
16+
Input = new SocketPipeReader(connection);
17+
Output = new SocketPipeWriter(connection);
18+
}
19+
20+
public PipeReader Input { get; }
21+
22+
public PipeWriter Output { get; }
23+
}
24+
}
25+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.IO.Pipelines;
5+
6+
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
7+
{
8+
internal sealed partial class SocketConnection
9+
{
10+
private class SocketPipeReader : PipeReader
11+
{
12+
private readonly SocketConnection _socketConnection;
13+
private readonly PipeReader _reader;
14+
15+
public SocketPipeReader(SocketConnection socketConnection)
16+
{
17+
_socketConnection = socketConnection;
18+
_reader = socketConnection.InnerTransport.Input;
19+
}
20+
21+
public override void AdvanceTo(SequencePosition consumed)
22+
{
23+
_reader.AdvanceTo(consumed);
24+
}
25+
26+
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
27+
{
28+
_reader.AdvanceTo(consumed, examined);
29+
}
30+
31+
public override void CancelPendingRead()
32+
{
33+
_reader.CancelPendingRead();
34+
}
35+
36+
public override void Complete(Exception? exception = null)
37+
{
38+
_reader.Complete(exception);
39+
}
40+
41+
public override ValueTask CompleteAsync(Exception? exception = null)
42+
{
43+
return _reader.CompleteAsync(exception);
44+
}
45+
46+
public override Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default)
47+
{
48+
_socketConnection.EnsureStarted();
49+
return _reader.CopyToAsync(destination, cancellationToken);
50+
}
51+
52+
public override Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default)
53+
{
54+
_socketConnection.EnsureStarted();
55+
return _reader.CopyToAsync(destination, cancellationToken);
56+
}
57+
58+
protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
59+
{
60+
_socketConnection.EnsureStarted();
61+
return _reader.ReadAtLeastAsync(minimumSize, cancellationToken);
62+
}
63+
64+
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
65+
{
66+
_socketConnection.EnsureStarted();
67+
return _reader.ReadAsync(cancellationToken);
68+
}
69+
70+
public override bool TryRead(out ReadResult result)
71+
{
72+
_socketConnection.EnsureStarted();
73+
return _reader.TryRead(out result);
74+
}
75+
}
76+
}
77+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.IO.Pipelines;
5+
6+
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
7+
{
8+
internal sealed partial class SocketConnection
9+
{
10+
private class SocketPipeWriter : PipeWriter
11+
{
12+
private readonly SocketConnection _socketConnection;
13+
private readonly PipeWriter _writer;
14+
15+
public SocketPipeWriter(SocketConnection socketConnection)
16+
{
17+
_socketConnection = socketConnection;
18+
_writer = socketConnection.InnerTransport.Output;
19+
}
20+
21+
public override bool CanGetUnflushedBytes => _writer.CanGetUnflushedBytes;
22+
23+
public override long UnflushedBytes => _writer.UnflushedBytes;
24+
25+
public override void Advance(int bytes)
26+
{
27+
_writer.Advance(bytes);
28+
}
29+
30+
public override void CancelPendingFlush()
31+
{
32+
_writer.CancelPendingFlush();
33+
}
34+
35+
public override void Complete(Exception? exception = null)
36+
{
37+
_writer.Complete(exception);
38+
}
39+
40+
public override ValueTask CompleteAsync(Exception? exception = null)
41+
{
42+
return _writer.CompleteAsync(exception);
43+
}
44+
45+
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
46+
{
47+
_socketConnection.EnsureStarted();
48+
return _writer.WriteAsync(source, cancellationToken);
49+
}
50+
51+
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
52+
{
53+
_socketConnection.EnsureStarted();
54+
return _writer.FlushAsync(cancellationToken);
55+
}
56+
57+
public override Memory<byte> GetMemory(int sizeHint = 0)
58+
{
59+
return _writer.GetMemory(sizeHint);
60+
}
61+
62+
public override Span<byte> GetSpan(int sizeHint = 0)
63+
{
64+
return _writer.GetSpan(sizeHint);
65+
}
66+
}
67+
}
68+
}

src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ internal sealed partial class SocketConnection : TransportConnection
3333
private readonly TaskCompletionSource _waitForConnectionClosedTcs = new TaskCompletionSource();
3434
private bool _connectionClosed;
3535
private readonly bool _waitForData;
36+
private int _connectionStarted;
3637

3738
internal SocketConnection(Socket socket,
3839
MemoryPool<byte> memoryPool,
@@ -67,31 +68,32 @@ internal SocketConnection(Socket socket,
6768

6869
var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);
6970

70-
// Set the transport and connection id
71-
Transport = _originalTransport = pair.Transport;
71+
_originalTransport = pair.Transport;
7272
Application = pair.Application;
7373

74+
Transport = new SocketDuplexPipe(this);
75+
7476
InitiaizeFeatures();
7577
}
7678

79+
public IDuplexPipe InnerTransport => _originalTransport;
80+
7781
public PipeWriter Input => Application.Output;
7882

7983
public PipeReader Output => Application.Input;
8084

8185
public override MemoryPool<byte> MemoryPool { get; }
8286

83-
public void Start()
87+
private void EnsureStarted()
8488
{
85-
try
89+
if (_connectionStarted == 1 || Interlocked.CompareExchange(ref _connectionStarted, 1, 0) == 1)
8690
{
87-
// Spawn send and receive logic
88-
_receivingTask = DoReceive();
89-
_sendingTask = DoSend();
90-
}
91-
catch (Exception ex)
92-
{
93-
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}.");
91+
return;
9492
}
93+
94+
// Offload these to avoid potentially blocking the first read/write/flush
95+
_receivingTask = Task.Run(DoReceive);
96+
_sendingTask = Task.Run(DoSend);
9597
}
9698

9799
public override void Abort(ConnectionAbortedException abortReason)
@@ -106,6 +108,9 @@ public override void Abort(ConnectionAbortedException abortReason)
106108
// Only called after connection middleware is complete which means the ConnectionClosed token has fired.
107109
public override async ValueTask DisposeAsync()
108110
{
111+
// Just in case we haven't started the connection, start it here so we can clean up properly.
112+
EnsureStarted();
113+
109114
_originalTransport.Input.Complete();
110115
_originalTransport.Output.Complete();
111116

@@ -125,7 +130,7 @@ public override async ValueTask DisposeAsync()
125130
}
126131
catch (Exception ex)
127132
{
128-
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}.");
133+
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.");
129134
}
130135
finally
131136
{

src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,6 @@ internal void Bind()
136136
setting.OutputOptions,
137137
waitForData: _options.WaitForDataBeforeAllocatingBuffer);
138138

139-
connection.Start();
140-
141139
_settingsIndex = (_settingsIndex + 1) % _settingsCount;
142140

143141
return connection;

0 commit comments

Comments
 (0)