Skip to content

Commit 73c2ce3

Browse files
authored
Enable cancellation for anonymous pipes and non-async named pipes on Windows (#72503)
* Enable cancellation for anonymous pipes and non-async named pipes on Windows Although ReadAsync, WriteAsync, and WaitForConnectionAsync on pipes all accept a CancellationToken, that token is only usable on Windows for canceling an in-flight operation when the pipe is using overlapped I/O. If the pipe was created for non-overlapped I/O, as is the case for anonymous pipes and can be the case for named pipes, the token stops being useful for anything other than an up-front cancellation check. This change fixes that by using CancelSynchronousIo to cancel the synchronous I/O performed as part of these async operations, which are implemented as async-over-sync (queueing to the thread pool a work item that performs the synchronous I/O). (The Unix implementation already supports cancellation in these situations.) * Address PR feedback (tweak comments)
1 parent c046354 commit 73c2ce3

File tree

10 files changed

+398
-119
lines changed

10 files changed

+398
-119
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.Runtime.InteropServices;
6+
using System.Threading;
7+
using Microsoft.Win32.SafeHandles;
8+
9+
internal static partial class Interop
10+
{
11+
internal static partial class Kernel32
12+
{
13+
[LibraryImport(Libraries.Kernel32, SetLastError = true)]
14+
[return: MarshalAs(UnmanagedType.Bool)]
15+
internal static unsafe partial bool CancelSynchronousIo(SafeThreadHandle hThread);
16+
}
17+
}

src/libraries/Common/src/Interop/Windows/Kernel32/Interop.OpenThread.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ internal static partial class Interop
88
{
99
internal static partial class Kernel32
1010
{
11+
internal const int THREAD_TERMINATE = 0x0001;
12+
1113
[LibraryImport(Libraries.Kernel32, SetLastError = true)]
12-
internal static partial SafeThreadHandle OpenThread(int access, [MarshalAs(UnmanagedType.Bool)] bool inherit, int threadId);
14+
internal static partial SafeThreadHandle OpenThread(int dwDesiredAccess, [MarshalAs(UnmanagedType.Bool)] bool bInheritHandle, int dwThreadId);
1315
}
1416
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.Runtime.InteropServices;
6+
7+
namespace Microsoft.Win32.SafeHandles
8+
{
9+
internal sealed class SafeThreadHandle : SafeHandle
10+
{
11+
public SafeThreadHandle() : base(invalidHandleValue: 0, ownsHandle: true) { }
12+
13+
public override bool IsInvalid => handle is 0 or -1;
14+
15+
protected override bool ReleaseHandle() => Interop.Kernel32.CloseHandle(handle);
16+
}
17+
}

src/libraries/Common/tests/StreamConformanceTests/System/IO/StreamConformanceTests.cs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -529,29 +529,33 @@ protected async Task ValidatePrecanceledOperations_ThrowsCancellationException(S
529529
}
530530
}
531531

532-
protected async Task ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(Stream stream)
532+
protected async Task ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(Stream stream, int cancellationDelay)
533533
{
534534
if (!stream.CanRead || !FullyCancelableOperations)
535535
{
536536
return;
537537
}
538538

539539
var cts = new CancellationTokenSource();
540+
540541
Task<int> t = stream.ReadAsync(new byte[1], 0, 1, cts.Token);
541-
cts.Cancel();
542+
543+
cts.CancelAfter(cancellationDelay);
542544
await AssertCanceledAsync(cts.Token, () => t);
543545
}
544546

545-
protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(Stream stream)
547+
protected async Task ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(Stream stream, int cancellationDelay)
546548
{
547549
if (!stream.CanRead || !FullyCancelableOperations)
548550
{
549551
return;
550552
}
551553

552554
var cts = new CancellationTokenSource();
555+
553556
Task<int> t = stream.ReadAsync(new byte[1], cts.Token).AsTask();
554-
cts.Cancel();
557+
558+
cts.CancelAfter(cancellationDelay);
555559
await AssertCanceledAsync(cts.Token, () => t);
556560
}
557561

@@ -1671,26 +1675,30 @@ public virtual async Task ReadWriteAsync_PrecanceledOperations_ThrowsCancellatio
16711675
}
16721676
}
16731677

1674-
[Fact]
1678+
[Theory]
1679+
[InlineData(0)]
1680+
[InlineData(100)]
16751681
[ActiveIssue("https://github.com/dotnet/runtime/issues/67853", TestPlatforms.tvOS)]
16761682
[SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets")]
1677-
public virtual async Task ReadAsync_CancelPendingTask_ThrowsCancellationException()
1683+
public virtual async Task ReadAsync_CancelPendingTask_ThrowsCancellationException(int cancellationDelay)
16781684
{
16791685
using StreamPair streams = await CreateConnectedStreamsAsync();
16801686
(Stream writeable, Stream readable) = GetReadWritePair(streams);
16811687

1682-
await ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(readable);
1688+
await ValidateCancelableReadAsyncTask_AfterInvocation_ThrowsCancellationException(readable, cancellationDelay);
16831689
}
16841690

1685-
[Fact]
1691+
[Theory]
1692+
[InlineData(0)]
1693+
[InlineData(100)]
16861694
[ActiveIssue("https://github.com/dotnet/runtime/issues/67853", TestPlatforms.tvOS)]
16871695
[SkipOnPlatform(TestPlatforms.LinuxBionic, "SElinux blocks UNIX sockets")]
1688-
public virtual async Task ReadAsync_CancelPendingValueTask_ThrowsCancellationException()
1696+
public virtual async Task ReadAsync_CancelPendingValueTask_ThrowsCancellationException(int cancellationDelay)
16891697
{
16901698
using StreamPair streams = await CreateConnectedStreamsAsync();
16911699
(Stream writeable, Stream readable) = GetReadWritePair(streams);
16921700

1693-
await ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(readable);
1701+
await ValidateCancelableReadAsyncValueTask_AfterInvocation_ThrowsCancellationException(readable, cancellationDelay);
16941702
}
16951703

16961704
[Fact]

src/libraries/System.Diagnostics.Process/src/Microsoft/Win32/SafeHandles/SafeThreadHandle.cs

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,9 @@
211211
Link="Common\Interop\Windows\Interop.MAX_PATH.cs" />
212212
<Compile Include="$(CommonPath)System\HResults.cs"
213213
Link="Common\System\HResults.cs" />
214+
<Compile Include="$(CommonPath)Microsoft\Win32\SafeHandles\SafeThreadHandle.cs"
215+
Link="Microsoft\Win32\SafeHandles\SafeThreadHandle.cs" />
214216
<Compile Include="Microsoft\Win32\SafeHandles\SafeProcessHandle.Windows.cs" />
215-
<Compile Include="Microsoft\Win32\SafeHandles\SafeThreadHandle.cs" />
216217
<Compile Include="System\Diagnostics\PerformanceCounterLib.cs" />
217218
<Compile Include="System\Diagnostics\Process.Windows.cs" />
218219
<Compile Include="System\Diagnostics\ProcessManager.Windows.cs" />

src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
Link="Common\System\Threading\Tasks\TaskToApm.cs" />
2929
</ItemGroup>
3030
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'windows'">
31+
<Compile Include="$(CommonPath)Microsoft\Win32\SafeHandles\SafeThreadHandle.cs"
32+
Link="Microsoft\Win32\SafeHandles\SafeThreadHandle.cs" />
3133
<Compile Include="$(CommonPath)Interop\Windows\Interop.Libraries.cs"
3234
Link="Common\Interop\Windows\Interop.Libraries.cs" />
3335
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.CloseHandle.cs"
@@ -94,12 +96,23 @@
9496
Link="Common\Interop\Windows\Interop.ImpersonateNamedPipeClient.cs" />
9597
<Compile Include="$(CommonPath)System\IO\Win32Marshal.cs"
9698
Link="Common\CoreLib\System\IO\Win32Marshal.cs" />
99+
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.CreateNamedPipeClient.cs"
100+
Link="Common\Interop\Windows\Interop.CreateNamedPipeClient.cs" />
101+
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.LoadLibraryEx_IntPtr.cs"
102+
Link="Common\Interop\Windows\Interop.LoadLibraryEx_IntPtr.cs" />
103+
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.OpenThread.cs"
104+
Link="Common\Interop\Windows\Interop.OpenThread.cs" />
105+
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.CancelSynchronousIo.cs"
106+
Link="Common\Interop\Windows\Interop.CancelSynchronousIo.cs" />
107+
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.GetCurrentThreadId.cs"
108+
Link="Common\Interop\Windows\Interop.GetCurrentThreadId.cs" />
97109
<Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Windows.cs" />
98110
<Compile Include="System\IO\Pipes\AnonymousPipeServerStreamAcl.cs" />
99111
<Compile Include="System\IO\Pipes\AnonymousPipeServerStream.Windows.cs" />
100112
<Compile Include="System\IO\Pipes\NamedPipeServerStreamAcl.cs" />
101113
<Compile Include="System\IO\Pipes\NamedPipeClientStream.Windows.cs" />
102114
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Windows.cs" />
115+
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Win32.cs" />
103116
<Compile Include="System\IO\Pipes\PipeAccessRights.cs" />
104117
<Compile Include="System\IO\Pipes\PipeAccessRule.cs" />
105118
<Compile Include="System\IO\Pipes\PipeAuditRule.cs" />
@@ -108,14 +121,6 @@
108121
<Compile Include="System\IO\Pipes\PipeStream.ValueTaskSource.cs" />
109122
<Compile Include="System\IO\Pipes\PipeStream.Windows.cs" />
110123
</ItemGroup>
111-
<!-- Windows : Win32 only -->
112-
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'windows'">
113-
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.CreateNamedPipeClient.cs"
114-
Link="Common\Interop\Windows\Interop.CreateNamedPipeClient.cs" />
115-
<Compile Include="$(CommonPath)Interop\Windows\Kernel32\Interop.LoadLibraryEx_IntPtr.cs"
116-
Link="Common\Interop\Windows\Interop.LoadLibraryEx_IntPtr.cs" />
117-
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Win32.cs" />
118-
</ItemGroup>
119124
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'Unix'">
120125
<Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Unix.cs" />
121126
<Compile Include="System\IO\Pipes\AnonymousPipeServerStream.Unix.cs" />
@@ -176,6 +181,7 @@
176181
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'windows'">
177182
<Reference Include="System.Collections.NonGeneric" />
178183
<Reference Include="System.Security.Claims" />
184+
<Reference Include="System.Threading.ThreadPool" />
179185
</ItemGroup>
180186
<ItemGroup Condition="'$(TargetPlatformIdentifier)' == 'Unix'">
181187
<Reference Include="Microsoft.Win32.Primitives" />

src/libraries/System.IO.Pipes/src/System/IO/Pipes/NamedPipeServerStream.Windows.cs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System.Diagnostics;
5+
using System.Runtime.CompilerServices;
56
using System.Runtime.ExceptionServices;
67
using System.Runtime.InteropServices;
78
using System.Security.AccessControl;
@@ -168,8 +169,7 @@ public void WaitForConnection()
168169

169170
if (IsAsync)
170171
{
171-
ValueTask vt = WaitForConnectionCoreAsync(CancellationToken.None);
172-
vt.AsTask().GetAwaiter().GetResult();
172+
WaitForConnectionCoreAsync(CancellationToken.None).AsTask().GetAwaiter().GetResult();
173173
}
174174
else
175175
{
@@ -183,33 +183,58 @@ public void WaitForConnection()
183183
}
184184

185185
// pipe already connected
186-
if (errorCode == Interop.Errors.ERROR_PIPE_CONNECTED && State == PipeState.Connected)
186+
if (State == PipeState.Connected)
187187
{
188188
throw new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected);
189189
}
190+
190191
// If we reach here then a connection has been established. This can happen if a client
191192
// connects in the interval between the call to CreateNamedPipe and the call to ConnectNamedPipe.
192193
// In this situation, there is still a good connection between client and server, even though
193194
// ConnectNamedPipe returns zero.
194195
}
196+
195197
State = PipeState.Connected;
196198
}
197199
}
198200

199-
public Task WaitForConnectionAsync(CancellationToken cancellationToken)
201+
public Task WaitForConnectionAsync(CancellationToken cancellationToken) =>
202+
cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) :
203+
IsAsync ? WaitForConnectionCoreAsync(cancellationToken).AsTask() :
204+
AsyncOverSyncWaitForConnection(cancellationToken);
205+
206+
private async Task AsyncOverSyncWaitForConnection(CancellationToken cancellationToken)
200207
{
201-
if (cancellationToken.IsCancellationRequested)
202-
{
203-
return Task.FromCanceled(cancellationToken);
204-
}
208+
// Create the work item state object. This is used to pass around state through various APIs,
209+
// while also serving double duty as the work item used to queue the operation to the thread pool.
210+
var workItem = new SyncAsyncWorkItem();
211+
212+
// Queue the work to the thread pool. This is implemented as a custom awaiter that queues the
213+
// awaiter itself to the thread pool.
214+
await workItem;
205215

206-
if (!IsAsync)
216+
// Register for cancellation.
217+
using (workItem.RegisterCancellation(cancellationToken))
207218
{
208-
return Task.Factory.StartNew(s => ((NamedPipeServerStream)s!).WaitForConnection(),
209-
this, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
219+
try
220+
{
221+
// Perform the wait.
222+
WaitForConnection();
223+
}
224+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
225+
{
226+
// If the write fails because of cancellation, it will have been a Win32 error code
227+
// that WriteCore translated into an OperationCanceledException without a stored
228+
// CancellationToken. We want to ensure the token is stored.
229+
throw new OperationCanceledException(cancellationToken);
230+
}
231+
finally
232+
{
233+
// Prior to calling Dispose on the CancellationTokenRegistration, we need to tell
234+
// the registration callback to exit if it's currently running; otherwise, we could deadlock.
235+
workItem.ContinueTryingToCancel = false;
236+
}
210237
}
211-
212-
return WaitForConnectionCoreAsync(cancellationToken).AsTask();
213238
}
214239

215240
public void Disconnect()

0 commit comments

Comments
 (0)