Skip to content

Commit e77b937

Browse files
Fix cancel in PipeReader.ReadAtLeastAsync (#66752)
1 parent 59b5585 commit e77b937

File tree

2 files changed

+37
-3
lines changed

2 files changed

+37
-3
lines changed

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -347,15 +347,15 @@ internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
347347
ValueTask<FlushResult> result;
348348
lock (SyncObj)
349349
{
350-
PrepareFlush(out completionData, out result, cancellationToken);
350+
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
351351
}
352352

353353
TrySchedule(ReaderScheduler, completionData);
354354

355355
return result;
356356
}
357357

358-
private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
358+
private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
359359
{
360360
var completeReader = CommitUnsynchronized();
361361

@@ -691,6 +691,9 @@ internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationTo
691691

692692
// We also need to flip the reading state off
693693
_operationState.EndRead();
694+
695+
// Begin read again to wire up cancellation token
696+
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
694697
}
695698

696699
// If the writer is currently paused and we are about the wait for more data then this would deadlock.
@@ -1057,7 +1060,7 @@ internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, Cancella
10571060
WriteMultiSegment(source.Span);
10581061
}
10591062

1060-
PrepareFlush(out completionData, out result, cancellationToken);
1063+
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
10611064
}
10621065

10631066
TrySchedule(ReaderScheduler, completionData);

src/libraries/System.IO.Pipelines/tests/PipeReaderReadAtLeastAsyncTests.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,5 +162,36 @@ public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync()
162162
Assert.True(result.IsCanceled);
163163
PipeReader.AdvanceTo(buffer.End);
164164
}
165+
166+
[Fact]
167+
public Task ReadAtLeastAsyncCancelableWhenWaitingForMoreData()
168+
{
169+
CancellationTokenSource cts = new CancellationTokenSource();
170+
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(1, cts.Token);
171+
cts.Cancel();
172+
return Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
173+
}
174+
175+
[Fact]
176+
public async Task ReadAtLeastAsyncCancelableAfterReadingSome()
177+
{
178+
CancellationTokenSource cts = new CancellationTokenSource();
179+
await Pipe.WriteAsync(new byte[10], default);
180+
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(11, cts.Token);
181+
cts.Cancel();
182+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await task);
183+
}
184+
185+
[Fact]
186+
public async Task ReadAtLeastAsyncCancelableAfterReadingSomeAndWritingAfterStartingRead()
187+
{
188+
CancellationTokenSource cts = new CancellationTokenSource();
189+
await Pipe.WriteAsync(new byte[10], default);
190+
ValueTask<ReadResult> task = PipeReader.ReadAtLeastAsync(12, cts.Token);
191+
// Write, but not enough to unblock ReadAtLeastAsync
192+
await Pipe.WriteAsync(new byte[1], default);
193+
cts.Cancel();
194+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await task);
195+
}
165196
}
166197
}

0 commit comments

Comments
 (0)