Skip to content

Commit 57e68b0

Browse files
BrennanConroyAndrew Stanton-Nurse
authored and
Andrew Stanton-Nurse
committed
Merged PR 3557: Revert "Wait to Complete Pipe"
1 parent 7fd42c4 commit 57e68b0

File tree

8 files changed

+12
-164
lines changed

8 files changed

+12
-164
lines changed

eng/PatchConfig.props

-6
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,4 @@ Later on, this will be checked using this condition:
4444
Microsoft.AspNetCore.CookiePolicy;
4545
</PackagesInPatch>
4646
</PropertyGroup>
47-
<PropertyGroup Condition=" '$(VersionPrefix)' == '2.1.14' ">
48-
<PackagesInPatch>
49-
Microsoft.AspNetCore.Http.Connections;
50-
Microsoft.AspNetCore.SignalR.Core;
51-
</PackagesInPatch>
52-
</PropertyGroup>
5347
</Project>

src/SignalR/clients/ts/FunctionalTests/selenium/run-tests.ts

+1-22
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,14 @@
11
import { ChildProcess, spawn } from "child_process";
2-
import * as _fs from "fs";
2+
import * as fs from "fs";
33
import { EOL } from "os";
44
import * as path from "path";
5-
import { promisify } from "util";
65
import { PassThrough, Readable } from "stream";
76

87
import { run } from "../../webdriver-tap-runner/lib";
98

109
import * as _debug from "debug";
1110
const debug = _debug("signalr-functional-tests:run");
1211

13-
const ARTIFACTS_DIR = path.resolve(__dirname, "..", "..", "..", "..", "artifacts");
14-
const LOGS_DIR = path.resolve(ARTIFACTS_DIR, "logs");
15-
16-
// Promisify things from fs we want to use.
17-
const fs = {
18-
createWriteStream: _fs.createWriteStream,
19-
exists: promisify(_fs.exists),
20-
mkdir: promisify(_fs.mkdir),
21-
};
22-
2312
process.on("unhandledRejection", (reason) => {
2413
console.error(`Unhandled promise rejection: ${reason}`);
2514
process.exit(1);
@@ -113,13 +102,6 @@ if (chromePath) {
113102
try {
114103
const serverPath = path.resolve(__dirname, "..", "bin", configuration, "netcoreapp2.1", "FunctionalTests.dll");
115104

116-
if (!await fs.exists(ARTIFACTS_DIR)) {
117-
await fs.mkdir(ARTIFACTS_DIR);
118-
}
119-
if (!await fs.exists(LOGS_DIR)) {
120-
await fs.mkdir(LOGS_DIR);
121-
}
122-
123105
debug(`Launching Functional Test Server: ${serverPath}`);
124106
const dotnet = spawn("dotnet", [serverPath], {
125107
env: {
@@ -135,9 +117,6 @@ if (chromePath) {
135117
}
136118
}
137119

138-
const logStream = fs.createWriteStream(path.resolve(LOGS_DIR, "ts.functionaltests.dotnet.log"));
139-
dotnet.stdout.pipe(logStream);
140-
141120
process.on("SIGINT", cleanup);
142121
process.on("exit", cleanup);
143122

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs

+3-25
Original file line numberDiff line numberDiff line change
@@ -274,35 +274,13 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
274274
// Cancel any pending flushes from back pressure
275275
Application?.Output.CancelPendingFlush();
276276

277-
// Normally it isn't safe to try and acquire this lock because the Send can hold onto it for a long time if there is backpressure
278-
// It is safe to wait for this lock now because the Send will be in one of 4 states
279-
// 1. In the middle of a write which is in the middle of being canceled by the CancelPendingFlush above, when it throws
280-
// an OperationCanceledException it will complete the PipeWriter which will make any other Send waiting on the lock
281-
// throw an InvalidOperationException if they call Write
282-
// 2. About to write and see that there is a pending cancel from the CancelPendingFlush, go to 1 to see what happens
283-
// 3. Enters the Send and sees the Dispose state from DisposeAndRemoveAsync and releases the lock
284-
// 4. No Send in progress
285-
await WriteLock.WaitAsync();
286-
try
287-
{
288-
// Complete the applications read loop
289-
Application?.Output.Complete(transportTask.Exception?.InnerException);
290-
}
291-
finally
292-
{
293-
WriteLock.Release();
294-
}
295-
296-
Log.WaitingForTransportAndApplication(_logger, TransportType);
297-
298-
// Wait for application so we can complete the writer safely
299-
await applicationTask.NoThrow();
300-
301-
// Shutdown application side now that it's finished
277+
// Shutdown both sides and wait for nothing
302278
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
279+
Application?.Output.Complete(transportTask.Exception?.InnerException);
303280

304281
try
305282
{
283+
Log.WaitingForTransportAndApplication(_logger, TransportType);
306284
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
307285
await Task.WhenAll(applicationTask, transportTask);
308286
}

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs

+5-10
Original file line numberDiff line numberDiff line change
@@ -511,14 +511,6 @@ private async Task ProcessSend(HttpContext context, HttpConnectionDispatcherOpti
511511

512512
context.Response.StatusCode = StatusCodes.Status404NotFound;
513513
context.Response.ContentType = "text/plain";
514-
515-
// There are no writes anymore (since this is the write "loop")
516-
// So it is safe to complete the writer
517-
// We complete the writer here because we already have the WriteLock acquired
518-
// and it's unsafe to complete outside of the lock
519-
// Other code isn't guaranteed to be able to acquire the lock before another write
520-
// even if CancelPendingFlush is called, and the other write could hang if there is backpressure
521-
connection.Application.Output.Complete();
522514
return;
523515
}
524516

@@ -557,8 +549,11 @@ private async Task ProcessDeleteAsync(HttpContext context)
557549

558550
Log.TerminatingConection(_logger);
559551

560-
// Dispose the connection, but don't wait for it. We assign it here so we can wait in tests
561-
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
552+
// Complete the receiving end of the pipe
553+
connection.Application.Output.Complete();
554+
555+
// Dispose the connection gracefully, but don't wait for it. We assign it here so we can wait in tests
556+
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
562557

563558
context.Response.StatusCode = StatusCodes.Status202Accepted;
564559
context.Response.ContentType = "text/plain";

src/SignalR/common/Http.Connections/src/Internal/TaskExtensions.cs

-27
This file was deleted.

src/SignalR/common/Shared/PipeWriterStream.cs

+2-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
@@ -76,15 +76,7 @@ private ValueTask WriteCoreAsync(ReadOnlyMemory<byte> source, CancellationToken
7676

7777
_length += source.Length;
7878
var task = _pipeWriter.WriteAsync(source);
79-
if (task.IsCompletedSuccessfully)
80-
{
81-
// Cancellation can be triggered by PipeWriter.CancelPendingFlush
82-
if (task.Result.IsCanceled)
83-
{
84-
throw new OperationCanceledException();
85-
}
86-
}
87-
else if (!task.IsCompletedSuccessfully)
79+
if (!task.IsCompletedSuccessfully)
8880
{
8981
return WriteSlowAsync(task);
9082
}

src/SignalR/server/Core/src/HubConnectionContext.cs

-61
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public class HubConnectionContext
3333

3434
private long _lastSendTimestamp = Stopwatch.GetTimestamp();
3535
private ReadOnlyMemory<byte> _cachedPingMessage;
36-
private volatile bool _connectionAborted;
3736

3837
/// <summary>
3938
/// Initializes a new instance of the <see cref="HubConnectionContext"/> class.
@@ -100,12 +99,6 @@ public virtual ValueTask WriteAsync(HubMessage message, CancellationToken cancel
10099
return new ValueTask(WriteSlowAsync(message));
101100
}
102101

103-
if (_connectionAborted)
104-
{
105-
_writeLock.Release();
106-
return default;
107-
}
108-
109102
// This method should never throw synchronously
110103
var task = WriteCore(message);
111104

@@ -136,12 +129,6 @@ public virtual ValueTask WriteAsync(SerializedHubMessage message, CancellationTo
136129
return new ValueTask(WriteSlowAsync(message));
137130
}
138131

139-
if (_connectionAborted)
140-
{
141-
_writeLock.Release();
142-
return default;
143-
}
144-
145132
// This method should never throw synchronously
146133
var task = WriteCore(message);
147134

@@ -171,8 +158,6 @@ private ValueTask<FlushResult> WriteCore(HubMessage message)
171158
{
172159
Log.FailedWritingMessage(_logger, ex);
173160

174-
Abort();
175-
176161
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
177162
}
178163
}
@@ -190,8 +175,6 @@ private ValueTask<FlushResult> WriteCore(SerializedHubMessage message)
190175
{
191176
Log.FailedWritingMessage(_logger, ex);
192177

193-
Abort();
194-
195178
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
196179
}
197180
}
@@ -205,8 +188,6 @@ private async Task CompleteWriteAsync(ValueTask<FlushResult> task)
205188
catch (Exception ex)
206189
{
207190
Log.FailedWritingMessage(_logger, ex);
208-
209-
Abort();
210191
}
211192
finally
212193
{
@@ -220,20 +201,13 @@ private async Task WriteSlowAsync(HubMessage message)
220201
await _writeLock.WaitAsync();
221202
try
222203
{
223-
if (_connectionAborted)
224-
{
225-
return;
226-
}
227-
228204
// Failed to get the lock immediately when entering WriteAsync so await until it is available
229205

230206
await WriteCore(message);
231207
}
232208
catch (Exception ex)
233209
{
234210
Log.FailedWritingMessage(_logger, ex);
235-
236-
Abort();
237211
}
238212
finally
239213
{
@@ -245,11 +219,6 @@ private async Task WriteSlowAsync(SerializedHubMessage message)
245219
{
246220
try
247221
{
248-
if (_connectionAborted)
249-
{
250-
return;
251-
}
252-
253222
// Failed to get the lock immediately when entering WriteAsync so await until it is available
254223
await _writeLock.WaitAsync();
255224

@@ -258,8 +227,6 @@ private async Task WriteSlowAsync(SerializedHubMessage message)
258227
catch (Exception ex)
259228
{
260229
Log.FailedWritingMessage(_logger, ex);
261-
262-
Abort();
263230
}
264231
finally
265232
{
@@ -283,20 +250,13 @@ private async Task TryWritePingSlowAsync()
283250
{
284251
try
285252
{
286-
if (_connectionAborted)
287-
{
288-
return;
289-
}
290-
291253
await _connectionContext.Transport.Output.WriteAsync(_cachedPingMessage);
292254

293255
Log.SentPing(_logger);
294256
}
295257
catch (Exception ex)
296258
{
297259
Log.FailedWritingMessage(_logger, ex);
298-
299-
Abort();
300260
}
301261
finally
302262
{
@@ -333,12 +293,6 @@ private async Task WriteHandshakeResponseAsync(HandshakeResponseMessage message)
333293
/// </summary>
334294
public virtual void Abort()
335295
{
336-
_connectionAborted = true;
337-
338-
// Cancel any current writes or writes that are about to happen and have already gone past the _connectionAborted bool
339-
// We have to do this outside of the lock otherwise it could hang if the write is observing backpressure
340-
_connectionContext.Transport.Output.CancelPendingFlush();
341-
342296
// If we already triggered the token then noop, this isn't thread safe but it's good enough
343297
// to avoid spawning a new task in the most common cases
344298
if (_connectionAbortedTokenSource.IsCancellationRequested)
@@ -469,24 +423,9 @@ internal void Abort(Exception exception)
469423
internal Task AbortAsync()
470424
{
471425
Abort();
472-
473-
// Acquire lock to make sure all writes are completed
474-
if (!_writeLock.Wait(0))
475-
{
476-
return AbortAsyncSlow();
477-
}
478-
479-
_writeLock.Release();
480426
return _abortCompletedTcs.Task;
481427
}
482428

483-
private async Task AbortAsyncSlow()
484-
{
485-
await _writeLock.WaitAsync();
486-
_writeLock.Release();
487-
await _abortCompletedTcs.Task;
488-
}
489-
490429
private void KeepAliveTick()
491430
{
492431
var timestamp = Stopwatch.GetTimestamp();

src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs

+1-3
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,9 @@ public async Task AbortFromHubMethodForcesClientDisconnect()
7979
{
8080
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
8181

82-
await client.SendInvocationAsync(nameof(AbortHub.Kill));
82+
await client.InvokeAsync(nameof(AbortHub.Kill));
8383

8484
await connectionHandlerTask.OrTimeout();
85-
86-
Assert.Null(client.TryRead());
8785
}
8886
}
8987

0 commit comments

Comments
 (0)