Skip to content

Add UnixFileStreamStrategy #55191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,12 @@ public static IEnumerable<object[]> MemberData_FileStreamAsyncWriting()
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] // Browser PNSE: Cannot wait on monitors
[PlatformSpecific(TestPlatforms.Windows)]
public Task ManyConcurrentWriteAsyncs()
{
// For inner loop, just test one case
return ManyConcurrentWriteAsyncs_OuterLoop(
useAsync: OperatingSystem.IsWindows(),
useAsync: true,
presize: false,
exposeHandle: false,
cancelable: true,
Expand All @@ -193,6 +194,7 @@ public Task ManyConcurrentWriteAsyncs()
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[PlatformSpecific(TestPlatforms.Windows)] // testing undocumented feature that's legacy in the Windows implementation
[MemberData(nameof(MemberData_FileStreamAsyncWriting))]
[OuterLoop] // many combinations: we test just one in inner loop and the rest outer
public async Task ManyConcurrentWriteAsyncs_OuterLoop(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<IncludeRemoteExecutor>true</IncludeRemoteExecutor>
<!-- Windows is currently the only OS for which we provide a new strategy, so we test the Net5Compat only for Windows -->
<TargetFrameworks>$(NetCoreAppCurrent)-windows</TargetFrameworks>
<TargetFrameworks>$(NetCoreAppCurrent)-windows;$(NetCoreAppCurrent)-Unix</TargetFrameworks>

<WasmXHarnessMonoArgs>--working-dir=/test-dir</WasmXHarnessMonoArgs>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,12 @@ public sealed partial class SafeFileHandle : SafeHandleZeroOrMinusOneIsInvalid
internal ThreadPoolValueTaskSource GetThreadPoolValueTaskSource() =>
Interlocked.Exchange(ref _reusableThreadPoolValueTaskSource, null) ?? new ThreadPoolValueTaskSource(this);

private void TryToReuse(ThreadPoolValueTaskSource source)
{
Interlocked.CompareExchange(ref _reusableThreadPoolValueTaskSource, source, null);
}

/// <summary>
/// A reusable <see cref="IValueTaskSource"/> implementation that
/// queues asynchronous <see cref="RandomAccess"/> operations to
/// be completed synchronously on the thread pool.
/// </summary>
internal sealed class ThreadPoolValueTaskSource : IThreadPoolWorkItem, IValueTaskSource<int>, IValueTaskSource<long>
internal sealed class ThreadPoolValueTaskSource : IThreadPoolWorkItem, IValueTaskSource<int>, IValueTaskSource<long>, IValueTaskSource
{
private readonly SafeFileHandle _fileHandle;
private ManualResetValueTaskSourceCore<long> _source;
Expand Down Expand Up @@ -68,7 +63,7 @@ private long GetResultAndRelease(short token)
finally
{
_source.Reset();
_fileHandle.TryToReuse(this);
Volatile.Write(ref _fileHandle._reusableThreadPoolValueTaskSource, this);
}
}

Expand All @@ -77,6 +72,7 @@ public void OnCompleted(Action<object?> continuation, object? state, short token
_source.OnCompleted(continuation, state, token, flags);
int IValueTaskSource<int>.GetResult(short token) => (int) GetResultAndRelease(token);
long IValueTaskSource<long>.GetResult(short token) => GetResultAndRelease(token);
void IValueTaskSource.GetResult(short token) => GetResultAndRelease(token);

private void ExecuteInternal()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.IO;
using System.IO.Strategies;
using System.Threading;

namespace Microsoft.Win32.SafeHandles
{
Expand All @@ -28,6 +29,10 @@ private SafeFileHandle(bool ownsHandle)

internal bool CanSeek => !IsClosed && GetCanSeek();

internal ThreadPoolBoundHandle? ThreadPoolBinding => null;

internal void EnsureThreadPoolBindingInitialized() { /* nop */ }

/// <summary>Opens the specified file with the requested flags and mode.</summary>
/// <param name="path">The path to the file.</param>
/// <param name="flags">The flags with which to open the file.</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public SafeFileHandle() : base(true)

internal bool CanSeek => !IsClosed && GetFileType() == Interop.Kernel32.FileTypes.FILE_TYPE_DISK;

internal bool IsPipe => GetFileType() == Interop.Kernel32.FileTypes.FILE_TYPE_PIPE;

internal ThreadPoolBoundHandle? ThreadPoolBinding { get; set; }

internal static unsafe SafeFileHandle Open(string fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options, long preallocationSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\FileStreamHelpers.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\FileStreamStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\OSFileStreamStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IObservable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IObserver.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IProgress.cs" />
Expand Down Expand Up @@ -1813,7 +1814,6 @@
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.CompletionSource.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\SyncWindowsFileStreamStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\WindowsFileStreamStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\PasteArguments.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\Loader\LibraryNameVariation.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\MemoryFailPoint.Windows.cs" />
Expand Down Expand Up @@ -2084,8 +2084,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Enumeration\FileSystemEnumerator.Unix.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\FileStreamHelpers.Unix.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.Unix.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.Lock.OSX.cs" Condition="'$(IsOSXLike)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.Lock.Unix.cs" Condition="'$(IsOSXLike)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\UnixFileStreamStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\PasteArguments.Unix.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\Loader\LibraryNameVariation.Unix.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Runtime\MemoryFailPoint.Unix.cs" />
Expand Down
11 changes: 9 additions & 2 deletions src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,17 @@ public override long Position

public override ValueTask DisposeAsync() => _strategy.DisposeAsync();

public override void CopyTo(Stream destination, int bufferSize) => _strategy.CopyTo(destination, bufferSize);
public override void CopyTo(Stream destination, int bufferSize)
{
ValidateCopyToArguments(destination, bufferSize);
_strategy.CopyTo(destination, bufferSize);
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
=> _strategy.CopyToAsync(destination, bufferSize, cancellationToken);
{
ValidateCopyToArguments(destination, bufferSize);
return _strategy.CopyToAsync(destination, bufferSize, cancellationToken);
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ internal static unsafe int ReadAtOffset(SafeFileHandle handle, Span<byte> buffer
{
fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer))
{
int result = Interop.Sys.PRead(handle, bufPtr, buffer.Length, fileOffset);
// The Windows implementation uses ReadFile, which ignores the offset if the handle
// isn't seekable. We do the same manually with PRead vs Read, in order to enable
// the function to be used by FileStream for all the same situations.
int result = handle.CanSeek ?
Interop.Sys.PRead(handle, bufPtr, buffer.Length, fileOffset) :
Interop.Sys.Read(handle, bufPtr, buffer.Length);
FileStreamHelpers.CheckFileCall(result, handle.Path);
return result;
}
Expand Down Expand Up @@ -67,7 +72,7 @@ internal static unsafe long ReadScatterAtOffset(SafeFileHandle handle, IReadOnly
return FileStreamHelpers.CheckFileCall(result, handle.Path);
}

private static ValueTask<int> ReadAtOffsetAsync(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
internal static ValueTask<int> ReadAtOffsetAsync(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
=> ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);

private static ValueTask<long> ReadScatterAtOffsetAsync(SafeFileHandle handle, IReadOnlyList<Memory<byte>> buffers,
Expand All @@ -78,9 +83,14 @@ internal static unsafe int WriteAtOffset(SafeFileHandle handle, ReadOnlySpan<byt
{
fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer))
{
int result = Interop.Sys.PWrite(handle, bufPtr, buffer.Length, fileOffset);
// The Windows implementation uses WriteFile, which ignores the offset if the handle
// isn't seekable. We do the same manually with PWrite vs Write, in order to enable
// the function to be used by FileStream for all the same situations.
int result = handle.CanSeek ?
Interop.Sys.PWrite(handle, bufPtr, buffer.Length, fileOffset) :
Interop.Sys.Write(handle, bufPtr, buffer.Length);
FileStreamHelpers.CheckFileCall(result, handle.Path);
return result;
return result;
}
}

Expand Down Expand Up @@ -117,7 +127,7 @@ internal static unsafe long WriteGatherAtOffset(SafeFileHandle handle, IReadOnly
return FileStreamHelpers.CheckFileCall(result, handle.Path);
}

private static ValueTask<int> WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
internal static ValueTask<int> WriteAtOffsetAsync(SafeFileHandle handle, ReadOnlyMemory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
=> ScheduleSyncWriteAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);

private static ValueTask<long> WriteGatherAtOffsetAsync(SafeFileHandle handle, IReadOnlyList<ReadOnlyMemory<byte>> buffers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace System.IO.Strategies
{
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
internal sealed partial class AsyncWindowsFileStreamStrategy : OSFileStreamStrategy
{
internal AsyncWindowsFileStreamStrategy(SafeFileHandle handle, FileAccess access, FileShare share)
: base(handle, access, share)
Expand Down Expand Up @@ -98,12 +98,8 @@ private Exception HandleIOError(long positionBefore, int errorCode)
return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path);
}

public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; // no buffering = nothing to flush

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
ValidateCopyToArguments(destination, bufferSize);

// Fail if the file was closed
if (_fileHandle.IsClosed)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private int ReadSpan(Span<byte> destination, ArraySegment<byte> arraySegment)

// If we are reading from a device with no clear EOF like a
// serial port or a pipe, this will cause us to block incorrectly.
if (!_strategy.IsPipe)
if (_strategy.CanSeek)
{
// If we hit the end of the buffer and didn't have enough bytes, we must
// read some more from the underlying stream. However, if we got
Expand Down Expand Up @@ -340,9 +340,9 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
Debug.Assert((_readPos == 0 && _readLen == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLen),
"We're either reading or writing, but not both.");

if (_strategy.IsPipe) // pipes have a very limited support for buffering
if (!_strategy.CanSeek)
{
return ReadFromPipeAsync(buffer, cancellationToken);
return ReadFromNonSeekableAsync(buffer, cancellationToken);
}

SemaphoreSlim semaphore = EnsureAsyncActiveSemaphoreInitialized();
Expand Down Expand Up @@ -383,9 +383,9 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
return ReadAsyncSlowPath(semaphoreLockTask, buffer, cancellationToken);
}

private async ValueTask<int> ReadFromPipeAsync(Memory<byte> destination, CancellationToken cancellationToken)
private async ValueTask<int> ReadFromNonSeekableAsync(Memory<byte> destination, CancellationToken cancellationToken)
{
Debug.Assert(_strategy.IsPipe);
Debug.Assert(!_strategy.CanSeek);

// Employ async waiting based on the same synchronization used in BeginRead of the abstract Stream.
await EnsureAsyncActiveSemaphoreInitialized().WaitAsync(cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -426,7 +426,7 @@ private async ValueTask<int> ReadFromPipeAsync(Memory<byte> destination, Cancell
private async ValueTask<int> ReadAsyncSlowPath(Task semaphoreLockTask, Memory<byte> buffer, CancellationToken cancellationToken)
{
Debug.Assert(_asyncActiveSemaphore != null);
Debug.Assert(!_strategy.IsPipe);
Debug.Assert(_strategy.CanSeek);

// Employ async waiting based on the same synchronization used in BeginRead of the abstract Stream.
await semaphoreLockTask.ConfigureAwait(false);
Expand Down Expand Up @@ -633,13 +633,13 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
Debug.Assert(!_strategy.IsClosed, "FileStream ensures that strategy is not closed");
Debug.Assert((_readPos == 0 && _readLen == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLen),
"We're either reading or writing, but not both.");
Debug.Assert(!_strategy.IsPipe || (_readPos == 0 && _readLen == 0),
Debug.Assert(_strategy.CanSeek || (_readPos == 0 && _readLen == 0),
"Win32FileStream must not have buffered data here! Pipes should be unidirectional.");

if (_strategy.IsPipe)
if (!_strategy.CanSeek)
{
// avoid async buffering with pipes, as doing so can lead to deadlocks (see comments in ReadFromPipeAsync)
return WriteToPipeAsync(buffer, cancellationToken);
return WriteToNonSeekableAsync(buffer, cancellationToken);
}

SemaphoreSlim semaphore = EnsureAsyncActiveSemaphoreInitialized();
Expand Down Expand Up @@ -690,9 +690,9 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
return WriteAsyncSlowPath(semaphoreLockTask, buffer, cancellationToken);
}

private async ValueTask WriteToPipeAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private async ValueTask WriteToNonSeekableAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(_strategy.IsPipe);
Debug.Assert(!_strategy.CanSeek);

await EnsureAsyncActiveSemaphoreInitialized().WaitAsync(cancellationToken).ConfigureAwait(false);
try
Expand All @@ -708,7 +708,7 @@ private async ValueTask WriteToPipeAsync(ReadOnlyMemory<byte> source, Cancellati
private async ValueTask WriteAsyncSlowPath(Task semaphoreLockTask, ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
Debug.Assert(_asyncActiveSemaphore != null);
Debug.Assert(!_strategy.IsPipe);
Debug.Assert(_strategy.CanSeek);

await semaphoreLockTask.ConfigureAwait(false);
try
Expand Down Expand Up @@ -878,7 +878,6 @@ private async Task FlushAsyncInternal(CancellationToken cancellationToken)

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
ValidateCopyToArguments(destination, bufferSize);
EnsureNotClosed();
EnsureCanRead();

Expand Down Expand Up @@ -921,7 +920,6 @@ private async Task CopyToAsyncCore(Stream destination, int bufferSize, Cancellat

public override void CopyTo(Stream destination, int bufferSize)
{
ValidateCopyToArguments(destination, bufferSize);
EnsureNotClosed();
EnsureCanRead();

Expand Down
Loading